Skip to content

Commit 3cddbb1

Browse files
author
p00p
committed
[yt/admin/cms] Add increasing timeout between node decomission and ban in CMS.
* Changelog entry Type: feature Component: go-sdk Add increasing timeout between node decomission and ban in CMS. commit_hash:7ebe1ccb8bbfa9cc232fbd614389761f77030b90
1 parent 32a0963 commit 3cddbb1

File tree

4 files changed

+115
-17
lines changed

4 files changed

+115
-17
lines changed

.mapping.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"internal/cms/missing_chunks_throttler.go":"yt/admin/cms/internal/cms/missing_chunks_throttler.go",
3434
"internal/cms/mocks/storage_mock.go":"yt/admin/cms/internal/cms/mocks/storage_mock.go",
3535
"internal/cms/node.go":"yt/admin/cms/internal/cms/node.go",
36+
"internal/cms/node_ban_limiter.go":"yt/admin/cms/internal/cms/node_ban_limiter.go",
3637
"internal/cms/proxy_cache_mock.go":"yt/admin/cms/internal/cms/proxy_cache_mock.go",
3738
"internal/cms/proxy_role_limits.go":"yt/admin/cms/internal/cms/proxy_role_limits.go",
3839
"internal/cms/queue_agent.go":"yt/admin/cms/internal/cms/queue_agent.go",

internal/cms/node.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,8 @@ func (p *TaskProcessor) decommissionNode(ctx context.Context, r *models.Node) {
309309
nodeWithoutChunks := storedChunkCount == 0 && node.State == ytsys.NodeStateOnline
310310
nodeOffline := p.checkNodeOffline(node)
311311

312-
if !task.IsGroupTask && !nodeWithoutChunks && time.Since(p.lastNodeBanTime) < p.conf.BannedNodeHoldOffPeriod {
313-
p.l.Info("can not ban node as another one was banned recently", p.nodeLogFields(task, r)...)
314-
return
315-
}
316-
317-
if task.IsGroupTask && task.TaskGroup != p.lastBannedNodeGroup && time.Since(p.lastNodeBanTime) < p.conf.BannedNodeHoldOffPeriod {
312+
if !p.nodeBanLimiter.Allow() &&
313+
((!task.IsGroupTask && !nodeWithoutChunks) || (task.IsGroupTask && task.TaskGroup != p.lastBannedNodeGroup)) {
318314
p.l.Info("can not ban node as another one was banned recently", p.nodeLogFields(task, r)...)
319315
return
320316
}
@@ -348,7 +344,7 @@ func (p *TaskProcessor) decommissionNode(ctx context.Context, r *models.Node) {
348344
}
349345
p.l.Info("node banned", p.nodeLogFields(task, r)...)
350346

351-
p.lastNodeBanTime = time.Now()
347+
p.nodeBanLimiter.LastBanTime = time.Now()
352348
if task.IsGroupTask {
353349
p.lastBannedNodeGroup = task.TaskGroup
354350
}
@@ -539,13 +535,15 @@ func (p *TaskProcessor) nodeLogFields(t *models.Task, n *models.Node, extra ...l
539535
return fields
540536
}
541537

542-
func (p *TaskProcessor) unbanNodes(ctx context.Context) error {
538+
func (p *TaskProcessor) unbanNodesBecauseOfLVC(ctx context.Context) error {
543539
tasks, err := p.storage.GetAll(ctx)
544540
if err != nil {
545541
return err
546542
}
547543

548544
var firstError error
545+
var unbanned bool
546+
549547
for _, t := range tasks {
550548
if t.WalleStatus == walle.StatusOK {
551549
continue
@@ -558,15 +556,25 @@ func (p *TaskProcessor) unbanNodes(ctx context.Context) error {
558556
}
559557

560558
n := r.Role.(*models.Node)
559+
if time.Since(time.Time(n.BanTime)) > p.conf.LVCUnbanWindow {
560+
continue
561+
}
562+
561563
p.l.Info("unbanning node", p.nodeLogFields(t, n)...)
562564
err := p.unbanNode(ctx, t, n)
563565
if err != nil && firstError == nil {
564566
firstError = err
567+
} else if err == nil {
568+
unbanned = true
565569
}
566570
}
567571
}
568572
}
569573

574+
if unbanned {
575+
p.nodeBanLimiter.Unbanned()
576+
}
577+
570578
return firstError
571579
}
572580

internal/cms/node_ban_limiter.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package cms
2+
3+
import (
4+
"time"
5+
)
6+
7+
// NodeBanLimiter stores node ban limiter.
8+
//
9+
// Unbanned() method must be called when a node was unbanned because of LVC,
10+
// Allow() method can be called to check if another node can be banned.
11+
type NodeBanLimiter struct {
12+
LastBanTime time.Time
13+
minTimeout time.Duration
14+
maxTimeout time.Duration
15+
unbanTimes []time.Time
16+
}
17+
18+
// NewNodeBanLimiter creates and initializes new node ban limiter.
19+
func NewNodeBanLimiter(minTimeout, maxTimeout time.Duration) *NodeBanLimiter {
20+
l := &NodeBanLimiter{
21+
minTimeout: minTimeout,
22+
maxTimeout: maxTimeout,
23+
}
24+
return l
25+
}
26+
27+
// Allow returns true if another node can be banned.
28+
//
29+
// It checks that "timeout" has expired since LastBanTime,
30+
// "timeout" is calculated as follows:
31+
// if Unbanned() method wasn't called during 3*maxTimeout duration,
32+
// "timeout" is minTimeout, if Unbanned() method was called once
33+
// during 3*maxTimeout duration, "timeout" is (minTimeout + maxTimeout) / 2,
34+
// otherwise "timeout" is maxTimeout.
35+
func (l *NodeBanLimiter) Allow() bool {
36+
return l.allow(l.LastBanTime, time.Now().UTC())
37+
}
38+
39+
func (l *NodeBanLimiter) allow(lastBanTime, now time.Time) bool {
40+
l.update(now)
41+
var timeout = l.minTimeout
42+
switch len(l.unbanTimes) {
43+
case 0:
44+
timeout = l.minTimeout
45+
case 1:
46+
timeout = (l.minTimeout + l.maxTimeout) / 2
47+
default:
48+
timeout = l.maxTimeout
49+
}
50+
return now.Sub(lastBanTime) >= timeout
51+
}
52+
53+
// Unbanned must be called when a node was unbanned because of LVC.
54+
func (l *NodeBanLimiter) Unbanned() {
55+
l.unbanned(time.Now().UTC())
56+
}
57+
58+
func (l *NodeBanLimiter) unbanned(now time.Time) {
59+
l.update(now)
60+
l.unbanTimes = append(l.unbanTimes, now)
61+
}
62+
63+
func (l *NodeBanLimiter) update(now time.Time) {
64+
var newQueue []time.Time
65+
for _, t := range l.unbanTimes {
66+
if now.Sub(t) >= 3*l.maxTimeout {
67+
continue
68+
}
69+
newQueue = append(newQueue, t)
70+
}
71+
l.unbanTimes = newQueue
72+
}

internal/cms/task_processor.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const (
2626
defaultMinGroupTaskSize = 2
2727
defaultOfflineNodeRetentionPeriod = time.Minute * 20
2828
defaultBannedNodeHoldOffPeriod = time.Minute * 3
29+
defaultLVCUnbanWindow = time.Hour * 2
2930
defaultDisabledSchedulerJobsWaitTimeout = time.Minute * 20
3031
defaultDisabledWriteSessionsWaitTimeout = time.Minute * 20
3132
defaultOfflineHTTPProxyRetentionPeriod = time.Minute * 10
@@ -99,6 +100,10 @@ type TaskProcessorConfig struct {
99100

100101
OfflineNodeRetentionPeriod time.Duration `yaml:"offline_node_retention_period"`
101102
BannedNodeHoldOffPeriod time.Duration `yaml:"banned_node_hold_off_period"`
103+
MaxBannedNodeHoldOffPeriod time.Duration `yaml:"max_banned_node_hold_off_period"`
104+
// LVCUnbanWindow is a duration, after which node will not be unbanned because of LVC.
105+
// It is assumed that all lost chunks will be found before LVCUnbanWindow has expired.
106+
LVCUnbanWindow time.Duration `yaml:"lvc_unban_window"`
102107
// DisabledSchedulerJobsWaitTimeout is a max time to wait for node's scheduler jobs to finish
103108
// in fast decommission scenarios.
104109
DisabledSchedulerJobsWaitTimeout time.Duration `yaml:"disabled_scheduler_jobs_wait_timeout"`
@@ -194,6 +199,14 @@ func (c *TaskProcessorConfig) UnmarshalYAML(unmarshal func(any) error) error {
194199
c.BannedNodeHoldOffPeriod = defaultBannedNodeHoldOffPeriod
195200
}
196201

202+
if c.MaxBannedNodeHoldOffPeriod <= 0 {
203+
c.MaxBannedNodeHoldOffPeriod = 3 * c.BannedNodeHoldOffPeriod
204+
}
205+
206+
if c.LVCUnbanWindow <= 0 {
207+
c.LVCUnbanWindow = defaultLVCUnbanWindow
208+
}
209+
197210
if c.DisabledSchedulerJobsWaitTimeout <= 0 {
198211
c.DisabledSchedulerJobsWaitTimeout = defaultDisabledSchedulerJobsWaitTimeout
199212
}
@@ -313,12 +326,12 @@ type TaskProcessor struct {
313326
// hostAnnotations store additional host information retrieved from Wall-e.
314327
hostAnnotations *HostAnnotations
315328

316-
// lastNodeBanTime stores a time of the latest node ban made by CMS.
317-
// This value is used to limit the number of parallel node bans.
318-
lastNodeBanTime time.Time
319329
// lastBannedNodeGroup stores group name of a last banned node
320330
// belonging to a group task.
321331
lastBannedNodeGroup string
332+
// nodeBanLimiter stores a time of the latest node ban and unbans made by CMS.
333+
// It is used to limit the number of parallel node bans, increasing ban interval, if LVC found.
334+
nodeBanLimiter *NodeBanLimiter
322335

323336
chunkIntegrity *ytsys.ChunkIntegrity
324337
missingChunksThrottler *MissingChunksThrottler
@@ -379,6 +392,7 @@ func (p *TaskProcessor) reset(tasks []*models.Task) {
379392
p.rpcProxyRoleLimits = NewProxyRoleLimits(p.Conf().MaxRPCProxiesPerRole, &rpcProxyCache{c: p.cluster})
380393
p.initRateLimiter(tasks)
381394
p.initGPURateLimiter(tasks)
395+
p.nodeBanLimiter = NewNodeBanLimiter(p.conf.BannedNodeHoldOffPeriod, p.conf.MaxBannedNodeHoldOffPeriod)
382396
p.initLastNodeBanTime(tasks)
383397
}
384398

@@ -429,24 +443,27 @@ func (p *TaskProcessor) initGPURateLimiter(tasks []*models.Task) {
429443

430444
// initLastNodeBanTime initializes last node ban time.
431445
func (p *TaskProcessor) initLastNodeBanTime(tasks []*models.Task) {
432-
if !p.lastNodeBanTime.IsZero() {
446+
if !p.nodeBanLimiter.LastBanTime.IsZero() {
433447
return
434448
}
435449

450+
var lastBanTime time.Time
451+
436452
for _, t := range tasks {
437453
for _, n := range t.GetNodes() {
438454
banTime := time.Time(n.BanTime)
439-
if n.Banned && banTime.After(p.lastNodeBanTime) {
440-
p.lastNodeBanTime = banTime
455+
if n.Banned && banTime.After(lastBanTime) {
456+
lastBanTime = banTime
441457
if t.IsGroupTask {
442458
p.lastBannedNodeGroup = t.TaskGroup
443459
}
444460
}
445461
}
446462
}
447463

464+
p.nodeBanLimiter.LastBanTime = lastBanTime
448465
p.l.Info("setting last node ban time",
449-
log.Time("last_node_ban_time", p.lastNodeBanTime),
466+
log.Time("last_node_ban_time", lastBanTime),
450467
log.String("last_banned_node_group", p.lastBannedNodeGroup))
451468
}
452469

@@ -656,10 +673,10 @@ func (p *TaskProcessor) checkChunkIntegrity(ctx context.Context) error {
656673

657674
if i.LVC > 0 || i.QMC > 0 {
658675
p.l.Info("LVC > 0 or QMC > 0 -> unbanning nodes", log.Int64("lvc", i.LVC), log.Int64("qmc", i.QMC))
659-
if err := p.unbanNodes(ctx); err != nil {
676+
if err := p.unbanNodesBecauseOfLVC(ctx); err != nil {
660677
p.l.Error("error unbanning nodes", log.Error(err))
661678
} else {
662-
p.lastNodeBanTime = time.Time{}
679+
p.nodeBanLimiter.LastBanTime = time.Time{}
663680
}
664681
} else {
665682
p.l.Info("LVC is 0")

0 commit comments

Comments
 (0)