Skip to content

Commit 992b621

Browse files
authored
Increase health check channel buffer (vitessio#17821) (#624)
* Increase health check channel buffer (vitessio#17821) Signed-off-by: Malcolm Akinje <makinje@slack-corp.com> * go mod tidy Signed-off-by: Malcolm Akinje <makinje@slack-corp.com> --------- Signed-off-by: Malcolm Akinje <makinje@slack-corp.com>
1 parent f153dbc commit 992b621

File tree

4 files changed

+50
-3
lines changed

4 files changed

+50
-3
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ require (
180180
github.com/tidwall/pretty v1.2.0 // indirect
181181
go.opencensus.io v0.23.0 // indirect
182182
go.uber.org/atomic v1.7.0 // indirect
183+
go.uber.org/goleak v1.3.0 // indirect
183184
go.uber.org/multierr v1.6.0 // indirect
184185
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
185186
google.golang.org/appengine v1.6.7 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -799,8 +799,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
799799
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
800800
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
801801
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
802-
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
803-
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
802+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
803+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
804804
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
805805
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
806806
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=

go/vt/discovery/healthcheck.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import (
5858
"vitess.io/vitess/go/vt/servenv"
5959
"vitess.io/vitess/go/vt/topo"
6060
"vitess.io/vitess/go/vt/topo/topoproto"
61+
"vitess.io/vitess/go/vt/topotools"
6162
"vitess.io/vitess/go/vt/vterrors"
6263
"vitess.io/vitess/go/vt/vttablet/queryservice"
6364
)
@@ -68,6 +69,9 @@ var (
6869
hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"})
6970
healthcheckOnce sync.Once
7071

72+
// counter that tells us how many healthcheck messages have been dropped
73+
hcChannelFullCounter = stats.NewCounter("HealthCheckChannelFullErrors", "Number of times the healthcheck broadcast channel was full")
74+
7175
// TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers.
7276
TabletURLTemplateString = "http://{{.GetTabletHostPort}}"
7377
tabletURLTemplate *template.Template
@@ -598,7 +602,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
598602
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
599603
hc.subMu.Lock()
600604
defer hc.subMu.Unlock()
601-
c := make(chan *TabletHealth, 2)
605+
c := make(chan *TabletHealth, 2048)
602606
hc.subscribers[c] = struct{}{}
603607
return c
604608
}
@@ -617,6 +621,9 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
617621
select {
618622
case c <- th:
619623
default:
624+
// If the channel is full, we drop the message.
625+
hcChannelFullCounter.Add(1)
626+
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
620627
}
621628
}
622629
}

go/vt/discovery/healthcheck_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"io"
2525
"strings"
2626
"sync"
27+
"sync/atomic"
2728
"testing"
2829
"time"
2930

@@ -1307,6 +1308,44 @@ func TestDebugURLFormatting(t *testing.T) {
13071308
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
13081309
}
13091310

1311+
// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
1312+
// Added in response to https://github.com/vitessio/vitess/issues/17629.
1313+
func TestConcurrentUpdates(t *testing.T) {
1314+
// reset error counters
1315+
hcErrorCounters.ResetAll()
1316+
ts := memorytopo.NewServer("cell")
1317+
defer ts.Close()
1318+
hc := createTestHc(ts)
1319+
// close healthcheck
1320+
defer hc.Close()
1321+
1322+
// Subscribe to the healthcheck
1323+
// Make the receiver keep track of the updates received.
1324+
ch := hc.Subscribe()
1325+
var totalCount atomic.Int32
1326+
go func() {
1327+
for range ch {
1328+
totalCount.Add(1)
1329+
// Simulate a somewhat slow consumer.
1330+
time.Sleep(100 * time.Millisecond)
1331+
}
1332+
}()
1333+
1334+
// Run multiple updates really quickly
1335+
// one after the other.
1336+
totalUpdates := 10
1337+
for i := 0; i < totalUpdates; i++ {
1338+
hc.broadcast(&TabletHealth{})
1339+
}
1340+
// Unsubscribe from the healthcheck
1341+
// and verify we process all the updates eventually.
1342+
hc.Unsubscribe(ch)
1343+
defer close(ch)
1344+
require.Eventuallyf(t, func() bool {
1345+
return totalUpdates == int(totalCount.Load())
1346+
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
1347+
}
1348+
13101349
func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
13111350
connMapMu.Lock()
13121351
defer connMapMu.Unlock()

0 commit comments

Comments
 (0)