Skip to content

Commit 682c4d0

Browse files
williamhbakerclaude
andcommitted
allocator: coordinate graceful exit of signaled members
Allow scenarios like unattended upgrades where many members are signaled to exit at once. Members now mark themselves as exiting rather than zeroing their item limit, and the allocator gradually sheds their capacity from available excess slots, oldest first, capped so that enough members remain to satisfy replication. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0f34a43 commit 682c4d0

16 files changed

+810
-361
lines changed

allocator/alloc_state.go

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package allocator
22

33
import (
4+
"cmp"
45
"hash/crc64"
56
"math"
7+
"slices"
68
"strconv"
79
"strings"
810

@@ -51,6 +53,13 @@ type State struct {
5153
// These share cardinality with |Members|.
5254
MemberTotalCount []int
5355
MemberPrimaryCount []int
56+
57+
// Number of item slots to shed from each member's ItemLimit, reducing its
58+
// effective capacity to ItemLimit - ShedCapacity. Exiting members are granted
59+
// ShedCapacity from available excess cluster capacity, ordered by age
60+
// (CreateRevision), so that the oldest exiting members drain first.
61+
// Shares cardinality with |Members|.
62+
ShedCapacity []int
5463
}
5564

5665
// NewObservedState returns a *State instance which extracts and updates itself
@@ -89,12 +98,15 @@ func (s *State) observe() {
8998
s.NetworkHash = 0
9099
s.MemberTotalCount = make([]int, len(s.Members))
91100
s.MemberPrimaryCount = make([]int, len(s.Members))
101+
s.ShedCapacity = make([]int, len(s.Members))
92102

93103
// Walk Members to:
94104
// * Group the set of ordered |Zones| across all Members.
95105
// * Initialize |ZoneSlots|.
96106
// * Initialize |MemberSlots|.
97107
// * Initialize |NetworkHash|.
108+
// * Collect indices of exiting members.
109+
var exiting []int
98110
for i := range s.Members {
99111
var m = memberAt(s.Members, i)
100112
var slots = m.ItemLimit()
@@ -114,6 +126,10 @@ func (s *State) observe() {
114126

115127
s.MemberSlots += slots
116128
s.NetworkHash = foldCRC(s.NetworkHash, s.Members[i].Raw.Key, slots)
129+
130+
if m.IsExiting() {
131+
exiting = append(exiting, i)
132+
}
117133
}
118134

119135
// Fetch |localMember| identified by |LocalKey|.
@@ -135,11 +151,14 @@ func (s *State) observe() {
135151
return strings.Compare(itemAt(s.Items, l).ID, assignmentAt(s.Assignments, r).ItemID)
136152
},
137153
}
154+
155+
var maxReplicationFactor int
138156
for cur, ok := it.Next(); ok; cur, ok = it.Next() {
139157
var item = itemAt(s.Items, cur.Left)
140158
var slots = item.DesiredReplication()
141159

142160
s.ItemSlots += slots
161+
maxReplicationFactor = max(maxReplicationFactor, slots)
143162
s.NetworkHash = foldCRC(s.NetworkHash, s.Items[cur.Left].Raw.Key, slots)
144163

145164
for r := cur.RightBegin; r != cur.RightEnd; r++ {
@@ -161,11 +180,39 @@ func (s *State) observe() {
161180
}
162181
}
163182
}
183+
184+
// Compute ShedCapacity for exiting members. ShedCapacity is granted to
185+
// exiting members, oldest first, up to each member's ItemLimit.
186+
// Excess slots available to grant as ShedCapacity. When the cluster is
187+
// overloaded (MemberSlots < ItemSlots), there is no excess to shed.
188+
var excessSlots = max(0, s.MemberSlots-s.ItemSlots)
189+
slices.SortFunc(exiting, func(a, b int) int {
190+
return cmp.Compare(s.Members[a].Raw.CreateRevision, s.Members[b].Raw.CreateRevision)
191+
})
192+
193+
// We must retain at least maxReplicationFactor members at full capacity
194+
// to satisfy replication requirements. This is a numerical bound, not
195+
// zone-aware: we shed the oldest members first, expecting that replacement
196+
// members will restore zone diversity as they join.
197+
var maxShedding = len(s.Members) - maxReplicationFactor
198+
if maxShedding < 0 {
199+
maxShedding = 0
200+
}
201+
for n, i := range exiting {
202+
if n == maxShedding || excessSlots == 0 {
203+
break
204+
}
205+
var shed = min(memberAt(s.Members, i).ItemLimit(), excessSlots)
206+
s.ShedCapacity[i] = shed
207+
s.MemberSlots -= shed
208+
excessSlots -= shed
209+
s.NetworkHash = foldCRC(s.NetworkHash, s.Members[i].Raw.Key, shed)
210+
}
164211
}
165212

166213
// shouldExit returns true iff the local Member is able to safely exit.
167214
func (s *State) shouldExit() bool {
168-
return memberAt(s.Members, s.LocalMemberInd).ItemLimit() == 0 && len(s.LocalItems) == 0
215+
return memberAt(s.Members, s.LocalMemberInd).IsExiting() && len(s.LocalItems) == 0
169216
}
170217

171218
// isLeader returns true iff the local Member key is ordered first on
@@ -201,16 +248,22 @@ func (s *State) debugLog() {
201248
}).Info("extracted State")
202249
}
203250

251+
// memberEffectiveLimit returns the effective item limit for the member at
252+
// index |ind|, accounting for any ShedCapacity granted to exiting members.
253+
func (s *State) memberEffectiveLimit(ind int) int {
254+
return memberAt(s.Members, ind).ItemLimit() - s.ShedCapacity[ind]
255+
}
256+
204257
// memberLoadRatio maps an |assignment| to a Member "load ratio". Given all
205258
// |Members| and their corresponding |counts| (1:1 with |Members|),
206259
// memberLoadRatio maps |assignment| to a Member and, if found, returns the
207-
// ratio of the Member's index in |counts| to the Member's ItemLimit. If the
208-
// Member is not found, infinity is returned.
260+
// ratio of the Member's index in |counts| to the Member's effective item
261+
// limit. If the Member is not found, infinity is returned.
209262
func (s *State) memberLoadRatio(assignment keyspace.KeyValue, counts []int) float32 {
210263
var a = assignment.Decoded.(Assignment)
211264

212265
if ind, found := s.Members.Search(MemberKey(s.KS, a.MemberZone, a.MemberSuffix)); found {
213-
return float32(counts[ind]) / float32(memberAt(s.Members, ind).ItemLimit())
266+
return float32(counts[ind]) / float32(s.memberEffectiveLimit(ind))
214267
}
215268
return math.MaxFloat32
216269
}

allocator/alloc_state_test.go

Lines changed: 200 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (s *AllocStateSuite) TestExitCondition(c *gc.C) {
109109
buildAllocKeySpaceFixture(c, ctx, client)
110110
defer etcdtest.Cleanup()
111111

112-
var _, err = client.Put(ctx, "/root/members/us-east#allowed-to-exit", `{"R": 0}`)
112+
var _, err = client.Put(ctx, "/root/members/us-east#allowed-to-exit", `{"R": 0, "E": true}`)
113113
c.Assert(err, gc.IsNil)
114114

115115
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
@@ -126,7 +126,7 @@ func (s *AllocStateSuite) TestExitCondition(c *gc.C) {
126126
c.Check(states[1].shouldExit(), gc.Equals, true)
127127

128128
// While we're at it, expect |NetworkHash| changed with the new member.
129-
c.Check(states[0].NetworkHash, gc.Equals, uint64(0xfce0237931d8c200))
129+
c.Check(states[0].NetworkHash, gc.Equals, uint64(0x554c9f4e9605a7a1))
130130
}
131131

132132
func (s *AllocStateSuite) TestLoadRatio(c *gc.C) {
@@ -153,6 +153,204 @@ func (s *AllocStateSuite) TestLoadRatio(c *gc.C) {
153153
}
154154
}
155155

156+
func (s *AllocStateSuite) TestLoadRatioWithShedding(c *gc.C) {
157+
var client, ctx = etcdtest.TestClient(), context.Background()
158+
defer etcdtest.Cleanup()
159+
160+
// m1 is exiting with R:5. m2 is not exiting with R:5.
161+
// 2 items at R:1. MemberSlots=10, ItemSlots=2, excessSlots=8.
162+
// m1 sheds its full capacity (5), so its effective limit is 0.
163+
for _, kv := range [][2]string{
164+
{"/root/items/item-a", `{"R": 1}`},
165+
{"/root/items/item-b", `{"R": 1}`},
166+
167+
{"/root/members/zone-a#m1", `{"R": 5, "E": true}`},
168+
{"/root/members/zone-a#m2", `{"R": 5, "E": false}`},
169+
170+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
171+
{"/root/assign/item-b#zone-a#m2#0", `consistent`},
172+
} {
173+
var _, err = client.Put(ctx, kv[0], kv[1])
174+
c.Assert(err, gc.IsNil)
175+
}
176+
177+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
178+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
179+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
180+
181+
c.Check(state.ShedCapacity[0], gc.Equals, 5) // m1 fully shed.
182+
c.Check(state.ShedCapacity[1], gc.Equals, 0)
183+
184+
// m1 has 1 assignment but effective capacity 0: load ratio is +Inf.
185+
c.Check(state.memberLoadRatio(state.Assignments[0], state.MemberTotalCount), gc.Equals, float32(math.Inf(1)))
186+
// m2 has 1 assignment and effective capacity 5: load ratio is 1/5.
187+
c.Check(state.memberLoadRatio(state.Assignments[1], state.MemberTotalCount), gc.Equals, float32(1.0/5.0))
188+
}
189+
190+
func (s *AllocStateSuite) TestShedCapacityAllAtOnce(c *gc.C) {
191+
var client, ctx = etcdtest.TestClient(), context.Background()
192+
defer etcdtest.Cleanup()
193+
194+
// 5 members, capacity 5 each. MemberSlots = 25.
195+
// 2 items at R:1. ItemSlots = 2. excessSlots = 23.
196+
// maxReplicationFactor = 1, maxShedding = 4.
197+
// Both exiting members shed their full capacity.
198+
for _, kv := range [][2]string{
199+
{"/root/items/item-a", `{"R": 1}`},
200+
{"/root/items/item-b", `{"R": 1}`},
201+
202+
{"/root/members/zone-a#m1", `{"R": 5, "E": true}`},
203+
{"/root/members/zone-a#m2", `{"R": 5, "E": true}`},
204+
{"/root/members/zone-a#m3", `{"R": 5, "E": false}`},
205+
{"/root/members/zone-b#m4", `{"R": 5, "E": false}`},
206+
{"/root/members/zone-b#m5", `{"R": 5, "E": false}`},
207+
208+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
209+
{"/root/assign/item-b#zone-b#m4#0", `consistent`},
210+
} {
211+
var _, err = client.Put(ctx, kv[0], kv[1])
212+
c.Assert(err, gc.IsNil)
213+
}
214+
215+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
216+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
217+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
218+
219+
c.Check(state.ShedCapacity[0], gc.Equals, 5) // m1 sheds fully.
220+
c.Check(state.ShedCapacity[1], gc.Equals, 5) // m2 sheds fully.
221+
c.Check(state.ShedCapacity[2], gc.Equals, 0)
222+
c.Check(state.ShedCapacity[3], gc.Equals, 0)
223+
c.Check(state.ShedCapacity[4], gc.Equals, 0)
224+
}
225+
226+
func (s *AllocStateSuite) TestShedCapacityReplicationConstrained(c *gc.C) {
227+
var client, ctx = etcdtest.TestClient(), context.Background()
228+
defer etcdtest.Cleanup()
229+
230+
// 4 members, capacity 10 each. MemberSlots = 40.
231+
// 3 items: R:3, R:2, R:1. ItemSlots = 6. excessSlots = 34.
232+
// maxReplicationFactor = 3, maxShedding = 1.
233+
// m1 and m2 are both exiting, but only the oldest (m1) sheds.
234+
for _, kv := range [][2]string{
235+
{"/root/items/item-a", `{"R": 3}`},
236+
{"/root/items/item-b", `{"R": 2}`},
237+
{"/root/items/item-c", `{"R": 1}`},
238+
239+
{"/root/members/zone-a#m1", `{"R": 10, "E": true}`},
240+
{"/root/members/zone-a#m2", `{"R": 10, "E": true}`},
241+
{"/root/members/zone-b#m3", `{"R": 10, "E": false}`},
242+
{"/root/members/zone-b#m4", `{"R": 10, "E": false}`},
243+
244+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
245+
{"/root/assign/item-a#zone-a#m2#1", `consistent`},
246+
{"/root/assign/item-a#zone-b#m3#2", `consistent`},
247+
{"/root/assign/item-b#zone-a#m1#0", `consistent`},
248+
{"/root/assign/item-b#zone-b#m4#1", `consistent`},
249+
{"/root/assign/item-c#zone-b#m3#0", `consistent`},
250+
} {
251+
var _, err = client.Put(ctx, kv[0], kv[1])
252+
c.Assert(err, gc.IsNil)
253+
}
254+
255+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
256+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
257+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
258+
259+
c.Check(state.ShedCapacity[0], gc.Equals, 10) // m1 sheds fully (oldest).
260+
c.Check(state.ShedCapacity[1], gc.Equals, 0) // m2 blocked by maxShedding.
261+
c.Check(state.ShedCapacity[2], gc.Equals, 0)
262+
c.Check(state.ShedCapacity[3], gc.Equals, 0)
263+
}
264+
265+
func (s *AllocStateSuite) TestShedCapacityExcessConstrained(c *gc.C) {
266+
var client, ctx = etcdtest.TestClient(), context.Background()
267+
defer etcdtest.Cleanup()
268+
269+
// 4 members, capacity 3 each. MemberSlots = 12.
270+
// 5 items at R:1. ItemSlots = 5. excessSlots = 7.
271+
// maxReplicationFactor = 1, maxShedding = 3.
272+
// 3 exiting members need 9 total shed, but only 7 excess available.
273+
// m1: sheds 3 (full), m2: sheds 3 (full), m3: sheds 1 (partial).
274+
for _, kv := range [][2]string{
275+
{"/root/items/item-a", `{"R": 1}`},
276+
{"/root/items/item-b", `{"R": 1}`},
277+
{"/root/items/item-c", `{"R": 1}`},
278+
{"/root/items/item-d", `{"R": 1}`},
279+
{"/root/items/item-e", `{"R": 1}`},
280+
281+
{"/root/members/zone-a#m1", `{"R": 3, "E": true}`},
282+
{"/root/members/zone-a#m2", `{"R": 3, "E": true}`},
283+
{"/root/members/zone-b#m3", `{"R": 3, "E": true}`},
284+
{"/root/members/zone-b#m4", `{"R": 3, "E": false}`},
285+
286+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
287+
{"/root/assign/item-b#zone-a#m1#0", `consistent`},
288+
{"/root/assign/item-c#zone-a#m2#0", `consistent`},
289+
{"/root/assign/item-d#zone-b#m3#0", `consistent`},
290+
{"/root/assign/item-e#zone-b#m4#0", `consistent`},
291+
} {
292+
var _, err = client.Put(ctx, kv[0], kv[1])
293+
c.Assert(err, gc.IsNil)
294+
}
295+
296+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
297+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
298+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
299+
300+
c.Check(state.ShedCapacity[0], gc.Equals, 3) // m1 sheds fully.
301+
c.Check(state.ShedCapacity[1], gc.Equals, 3) // m2 sheds fully.
302+
c.Check(state.ShedCapacity[2], gc.Equals, 1) // m3 gets remaining excess.
303+
c.Check(state.ShedCapacity[3], gc.Equals, 0) // non-exiting.
304+
}
305+
306+
func (s *AllocStateSuite) TestShedCapacityOverloadedCluster(c *gc.C) {
307+
var client, ctx = etcdtest.TestClient(), context.Background()
308+
defer etcdtest.Cleanup()
309+
310+
// 2 members, capacity 2 each. MemberSlots = 4.
311+
// 5 items at R:1. ItemSlots = 5. excessSlots = -1 (clamped to 0).
312+
// No shedding is possible because the cluster is overloaded.
313+
for _, kv := range [][2]string{
314+
{"/root/items/item-a", `{"R": 1}`},
315+
{"/root/items/item-b", `{"R": 1}`},
316+
{"/root/items/item-c", `{"R": 1}`},
317+
{"/root/items/item-d", `{"R": 1}`},
318+
{"/root/items/item-e", `{"R": 1}`},
319+
320+
{"/root/members/zone-a#m1", `{"R": 2, "E": true}`},
321+
{"/root/members/zone-b#m2", `{"R": 2, "E": false}`},
322+
323+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
324+
{"/root/assign/item-b#zone-a#m1#0", `consistent`},
325+
{"/root/assign/item-c#zone-b#m2#0", `consistent`},
326+
{"/root/assign/item-d#zone-b#m2#0", `consistent`},
327+
} {
328+
var _, err = client.Put(ctx, kv[0], kv[1])
329+
c.Assert(err, gc.IsNil)
330+
}
331+
332+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
333+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
334+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
335+
336+
c.Check(state.ShedCapacity[0], gc.Equals, 0) // m1: no excess to shed.
337+
c.Check(state.ShedCapacity[1], gc.Equals, 0)
338+
}
339+
340+
func (s *AllocStateSuite) TestShedCapacityNoExiting(c *gc.C) {
341+
var client, ctx = etcdtest.TestClient(), context.Background()
342+
defer etcdtest.Cleanup()
343+
buildAllocKeySpaceFixture(c, ctx, client)
344+
345+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
346+
var state = NewObservedState(ks, MemberKey(ks, "us-west", "baz"), isConsistent)
347+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
348+
349+
for i := range state.ShedCapacity {
350+
c.Check(state.ShedCapacity[i], gc.Equals, 0)
351+
}
352+
}
353+
156354
var _ = gc.Suite(&AllocStateSuite{})
157355

158356
func TestMain(m *testing.M) { etcdtest.TestMainWithEtcd(m) }

allocator/allocator_key_space.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ const (
3434
type MemberValue interface {
3535
// ItemLimit is the maximum number of Items this Member may be assigned.
3636
ItemLimit() int
37+
// IsExiting returns true if this Member has been signaled to exit.
38+
IsExiting() bool
3739
}
3840

3941
// ItemValue is a user-defined Item representation which also supports required

allocator/allocator_key_space_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,15 @@ func isConsistent(_ Item, assignment keyspace.KeyValue, allAssignments keyspace.
354354
return assignment.Decoded.(Assignment).AssignmentValue.(testAssignment).consistent
355355
}
356356

357-
type testMember struct{ R int }
357+
type testMember struct {
358+
R int // ItemLimit
359+
E bool // Exiting
360+
}
358361

359-
func (m testMember) ItemLimit() int { return m.R }
360-
func (m testMember) Validate() error { return nil }
361-
func (m *testMember) ZeroLimit() { m.R = 0 }
362+
func (m testMember) ItemLimit() int { return m.R }
363+
func (m testMember) Validate() error { return nil }
364+
func (m *testMember) SetExiting() { m.E = true }
365+
func (m testMember) IsExiting() bool { return m.E }
362366

363367
func (m *testMember) MarshalString() string {
364368
if b, err := json.Marshal(m); err != nil {

0 commit comments

Comments
 (0)