Skip to content

Commit c6287dd

Browse files
backport 17872 (#756)
Co-authored-by: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com>
1 parent ba21434 commit c6287dd

File tree

11 files changed

+731
-126
lines changed

11 files changed

+731
-126
lines changed

go/vt/discovery/fake_healthcheck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
192192
}
193193

194194
// GetLoadTabletsTrigger is not implemented.
195-
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan struct{} {
195+
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
196196
return nil
197197
}
198198

go/vt/discovery/healthcheck.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ type HealthCheck interface {
252252
Unsubscribe(c chan *TabletHealth)
253253

254254
// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
255-
GetLoadTabletsTrigger() chan struct{}
255+
GetLoadTabletsTrigger() chan topo.KeyspaceShard
256256
}
257257

258258
var _ HealthCheck = (*HealthCheckImpl)(nil)
@@ -302,8 +302,8 @@ type HealthCheckImpl struct {
302302
subMu sync.Mutex
303303
// subscribers
304304
subscribers map[chan *TabletHealth]struct{}
305-
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
306-
loadTabletsTrigger chan struct{}
305+
// loadTabletsTrigger is used to immediately load information about tablets of a specific shard.
306+
loadTabletsTrigger chan topo.KeyspaceShard
307307
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
308308
healthCheckDialSem *semaphore.Weighted
309309
}
@@ -371,7 +371,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
371371
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
372372
subscribers: make(map[chan *TabletHealth]struct{}),
373373
cellAliases: make(map[string]string),
374-
loadTabletsTrigger: make(chan struct{}, 1),
374+
loadTabletsTrigger: make(chan topo.KeyspaceShard, 1024),
375375
}
376376
var topoWatchers []*TopologyWatcher
377377
cells := strings.Split(cellsToWatch, ",")
@@ -543,18 +543,21 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
543543
}
544544

545545
// If the previous tablet type was primary, we need to check if the next new primary has already been assigned.
546-
// If no new primary has been assigned, we will trigger a `loadTablets` call to immediately redirect traffic to the new primary.
546+
// If no new primary has been assigned, we will trigger loading of tablets for this keyspace shard to immediately redirect traffic to the new primary.
547547
//
548548
// This is to avoid a situation where a newly primary tablet for a shard has just been started and the tableRefreshInterval has not yet passed,
549549
// causing an interruption where no primary is assigned to the shard.
550550
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
551551
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
552552
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
553-
// We want to trigger a loadTablets call, but if the channel is not empty
554-
// then a trigger is already scheduled, we don't need to trigger another one.
555-
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
553+
// We want to trigger a call to load tablets for this keyspace-shard,
554+
// but we want this to be non-blocking to prevent the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
555+
// If the buffer is exhausted, then we'll just receive the update when all the tablets are loaded on the ticker.
556556
select {
557-
case hc.loadTabletsTrigger <- struct{}{}:
557+
case hc.loadTabletsTrigger <- topo.KeyspaceShard{
558+
Keyspace: prevTarget.Keyspace,
559+
Shard: prevTarget.Shard,
560+
}:
558561
default:
559562
}
560563
}
@@ -670,7 +673,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
670673
}
671674

672675
// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
673-
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan struct{} {
676+
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
674677
return hc.loadTabletsTrigger
675678
}
676679

go/vt/discovery/healthcheck_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,68 @@ func TestPrimaryInOtherCell(t *testing.T) {
12271227
mustMatch(t, want, a[0], "Expecting healthy primary")
12281228
}
12291229

1230+
// TestLoadTabletsTrigger tests that we send the correct information on the load tablets trigger.
1231+
func TestLoadTabletsTrigger(t *testing.T) {
1232+
ctx := utils.LeakCheckContext(t)
1233+
1234+
// create a health check instance.
1235+
hc := NewHealthCheck(ctx, time.Hour, time.Hour, nil, "", "", nil)
1236+
defer hc.Close()
1237+
1238+
ks := "keyspace"
1239+
shard := "shard"
1240+
// Add a tablet to the topology.
1241+
tablet1 := &topodatapb.Tablet{
1242+
Alias: &topodatapb.TabletAlias{
1243+
Cell: "zone-1",
1244+
Uid: 100,
1245+
},
1246+
Type: topodatapb.TabletType_REPLICA,
1247+
Hostname: "host1",
1248+
PortMap: map[string]int32{
1249+
"grpc": 123,
1250+
},
1251+
Keyspace: ks,
1252+
Shard: shard,
1253+
}
1254+
1255+
// We want to run updateHealth with arguments that always
1256+
// make it trigger load Tablets.
1257+
th := &TabletHealth{
1258+
Tablet: tablet1,
1259+
Target: &querypb.Target{
1260+
Keyspace: ks,
1261+
Shard: shard,
1262+
TabletType: topodatapb.TabletType_REPLICA,
1263+
},
1264+
}
1265+
prevTarget := &querypb.Target{
1266+
Keyspace: ks,
1267+
Shard: shard,
1268+
TabletType: topodatapb.TabletType_PRIMARY,
1269+
}
1270+
hc.AddTablet(tablet1)
1271+
1272+
numTriggers := 10
1273+
for i := 0; i < numTriggers; i++ {
1274+
// Since the previous target was a primary, and there are no other
1275+
// primary tablets for the given keyspace shard, we will see the healtcheck
1276+
// send on the loadTablets trigger. We just want to verify the information
1277+
// there is correct.
1278+
hc.updateHealth(th, prevTarget, false, false)
1279+
}
1280+
1281+
ch := hc.GetLoadTabletsTrigger()
1282+
require.Len(t, ch, numTriggers)
1283+
for i := 0; i < numTriggers; i++ {
1284+
// Read from the channel and verify we indeed have the right values.
1285+
kss := <-ch
1286+
require.EqualValues(t, ks, kss.Keyspace)
1287+
require.EqualValues(t, shard, kss.Shard)
1288+
}
1289+
require.Len(t, ch, 0)
1290+
}
1291+
12301292
func TestReplicaInOtherCell(t *testing.T) {
12311293
ctx := utils.LeakCheckContext(t)
12321294

go/vt/discovery/topology_watcher.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,39 @@ func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
110110
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
111111
}
112112

113+
func (tw *TopologyWatcher) getTabletsByShard(keyspace string, shard string) ([]*topo.TabletInfo, error) {
114+
return tw.topoServer.GetTabletsByShardCell(tw.ctx, keyspace, shard, []string{tw.cell})
115+
}
116+
113117
// Start starts the topology watcher.
114118
func (tw *TopologyWatcher) Start() {
115119
tw.wg.Add(1)
120+
// Goroutine to refresh the tablets list periodically.
116121
go func(t *TopologyWatcher) {
117122
defer t.wg.Done()
118123
ticker := time.NewTicker(t.refreshInterval)
119124
defer ticker.Stop()
125+
t.loadTablets()
120126
for {
121-
t.loadTablets()
122127
select {
123128
case <-t.ctx.Done():
124129
return
125-
case <-tw.healthcheck.GetLoadTabletsTrigger():
130+
case kss := <-t.healthcheck.GetLoadTabletsTrigger():
131+
t.loadTabletsForKeyspaceShard(kss.Keyspace, kss.Shard)
126132
case <-ticker.C:
133+
// Since we are going to load all the tablets,
134+
// we can clear out the entire list for reloading
135+
// specific keyspace shards.
136+
func() {
137+
for {
138+
select {
139+
case <-t.healthcheck.GetLoadTabletsTrigger():
140+
default:
141+
return
142+
}
143+
}
144+
}()
145+
t.loadTablets()
127146
}
128147
}
129148
}(tw)
@@ -136,10 +155,23 @@ func (tw *TopologyWatcher) Stop() {
136155
tw.wg.Wait()
137156
}
138157

158+
func (tw *TopologyWatcher) loadTabletsForKeyspaceShard(keyspace string, shard string) {
159+
if keyspace == "" || shard == "" {
160+
log.Errorf("topologyWatcher: loadTabletsForKeyspaceShard: keyspace and shard are required")
161+
return
162+
}
163+
tabletInfos, err := tw.getTabletsByShard(keyspace, shard)
164+
if err != nil {
165+
log.Errorf("error getting tablets for keyspace-shard: %v:%v: %v", keyspace, shard, err)
166+
return
167+
}
168+
// Since we are only reading tablets for a keyspace shard,
169+
// this is by default a partial result.
170+
tw.storeTabletInfos(tabletInfos /* partialResults */, true)
171+
}
172+
139173
func (tw *TopologyWatcher) loadTablets() {
140-
newTablets := make(map[string]*tabletInfo)
141174
var partialResult bool
142-
143175
// First get the list of all tablets.
144176
tabletInfos, err := tw.getTablets()
145177
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
@@ -155,6 +187,11 @@ func (tw *TopologyWatcher) loadTablets() {
155187
}
156188
}
157189

190+
tw.storeTabletInfos(tabletInfos, partialResult)
191+
}
192+
193+
func (tw *TopologyWatcher) storeTabletInfos(tabletInfos []*topo.TabletInfo, partialResult bool) {
194+
newTablets := make(map[string]*tabletInfo)
158195
// Accumulate a list of all known alias strings to use later
159196
// when sorting.
160197
tabletAliasStrs := make([]string, 0, len(tabletInfos))
@@ -243,7 +280,6 @@ func (tw *TopologyWatcher) loadTablets() {
243280
}
244281
tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes())
245282
tw.lastRefresh = time.Now()
246-
247283
}
248284

249285
// RefreshLag returns the time since the last refresh.

go/vt/discovery/topology_watcher_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,71 @@ func TestFilterByKeyspace(t *testing.T) {
487487
}
488488
}
489489

490+
// TestLoadTablets tests that loadTablets works as intended for the given inputs.
491+
func TestLoadTablets(t *testing.T) {
492+
ctx := utils.LeakCheckContext(t)
493+
494+
hc := NewFakeHealthCheck(nil)
495+
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
496+
ts := memorytopo.NewServer(ctx, testCell)
497+
defer ts.Close()
498+
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true)
499+
500+
// Add 2 tablets from 2 different tracked keyspaces to the topology.
501+
tablet1 := &topodatapb.Tablet{
502+
Alias: &topodatapb.TabletAlias{
503+
Cell: testCell,
504+
Uid: 0,
505+
},
506+
Hostname: "host1",
507+
PortMap: map[string]int32{
508+
"vt": 123,
509+
},
510+
Keyspace: "ks1",
511+
Shard: "shard",
512+
}
513+
tablet2 := &topodatapb.Tablet{
514+
Alias: &topodatapb.TabletAlias{
515+
Cell: testCell,
516+
Uid: 10,
517+
},
518+
Hostname: "host2",
519+
PortMap: map[string]int32{
520+
"vt": 124,
521+
},
522+
Keyspace: "ks4",
523+
Shard: "shard",
524+
}
525+
for _, ks := range testKeyspacesToWatch {
526+
_, err := ts.GetOrCreateShard(ctx, ks, "shard")
527+
require.NoError(t, err)
528+
}
529+
require.NoError(t, ts.CreateTablet(ctx, tablet1))
530+
require.NoError(t, ts.CreateTablet(ctx, tablet2))
531+
532+
// Let's refresh the information for a different keyspace shard. We shouldn't
533+
// reload either tablet's information.
534+
tw.loadTabletsForKeyspaceShard("ks2", "shard")
535+
key1 := TabletToMapKey(tablet1)
536+
key2 := TabletToMapKey(tablet2)
537+
allTablets := hc.GetAllTablets()
538+
assert.NotContains(t, allTablets, key1)
539+
assert.NotContains(t, allTablets, key2)
540+
541+
// Now, if we reload the first tablet's shard, we should see this tablet
542+
// but not the other.
543+
tw.loadTabletsForKeyspaceShard("ks1", "shard")
544+
allTablets = hc.GetAllTablets()
545+
assert.Contains(t, allTablets, key1)
546+
assert.NotContains(t, allTablets, key2)
547+
548+
// Finally, if we load all the tablets, both the tablets should be visible.
549+
tw.loadTablets()
550+
allTablets = hc.GetAllTablets()
551+
assert.Contains(t, allTablets, key1)
552+
assert.Contains(t, allTablets, key2)
553+
}
554+
490555
// TestFilterByKeyspaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher
491556
// has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want
492557
// to ensure that the TopologyWatcher:

go/vt/topo/shard.go

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import (
2727
"sync"
2828
"time"
2929

30-
"golang.org/x/sync/errgroup"
31-
3230
"vitess.io/vitess/go/constants/sidecar"
3331
"vitess.io/vitess/go/event"
3432
"vitess.io/vitess/go/protoutil"
@@ -661,47 +659,20 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
661659
defer span.Finish()
662660
var err error
663661

664-
if len(cells) == 0 {
665-
cells, err = ts.GetCellInfoNames(ctx)
666-
if err != nil {
667-
return nil, err
668-
}
669-
if len(cells) == 0 { // Nothing to do
670-
return nil, nil
671-
}
662+
// if we get a partial result, we keep going. It most likely means
663+
// a cell is out of commission.
664+
aliases, err := ts.FindAllTabletAliasesInShardByCell(ctx, keyspace, shard, cells)
665+
if err != nil && !IsErrType(err, PartialResult) {
666+
return nil, err
672667
}
673668

674-
mu := sync.Mutex{}
675-
eg, ctx := errgroup.WithContext(ctx)
676-
677-
tablets := make([]*TabletInfo, 0, len(cells))
678-
var kss *KeyspaceShard
679-
if keyspace != "" {
680-
kss = &KeyspaceShard{
681-
Keyspace: keyspace,
682-
Shard: shard,
683-
}
684-
}
685-
options := &GetTabletsByCellOptions{
686-
KeyspaceShard: kss,
687-
}
688-
for _, cell := range cells {
689-
eg.Go(func() error {
690-
t, err := ts.GetTabletsByCell(ctx, cell, options)
691-
if err != nil {
692-
return vterrors.Wrapf(err, "GetTabletsByCell for %v failed.", cell)
693-
}
694-
mu.Lock()
695-
defer mu.Unlock()
696-
tablets = append(tablets, t...)
697-
return nil
698-
})
699-
}
700-
if err := eg.Wait(); err != nil {
701-
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err)
702-
return tablets, NewError(PartialResult, shard)
669+
// get the tablets for the cells we were able to reach, forward
670+
// ErrPartialResult from FindAllTabletAliasesInShard
671+
result, gerr := ts.GetTabletList(ctx, aliases, nil)
672+
if gerr == nil && err != nil {
673+
gerr = err
703674
}
704-
return tablets, nil
675+
return result, gerr
705676
}
706677

707678
// GetTabletMapForShard returns the tablets for a shard. It can return

0 commit comments

Comments
 (0)