Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
ExcludeTabletsWithMaxReplicationLag time.Duration
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
Expand Down Expand Up @@ -346,8 +347,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
if len(candidates) == 0 {
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, maxReplicationLag: %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, tp.options.ExcludeTabletsWithMaxReplicationLag, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
case <-ctx.Done():
Expand Down Expand Up @@ -453,8 +454,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil &&
(shr.Serving || tp.options.IncludeNonServingTablets) &&
shr.RealtimeStats != nil &&
shr.RealtimeStats.HealthError == "" {
(shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" &&
(tabletInfo.Tablet.Type == topodatapb.TabletType_PRIMARY /* lag is not relevant */ ||
(tp.options.ExcludeTabletsWithMaxReplicationLag == 0 /* not set */ ||
shr.RealtimeStats.ReplicationLagSeconds <= uint32(tp.options.ExcludeTabletsWithMaxReplicationLag.Seconds())))) {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
Expand Down
60 changes: 60 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,61 @@ func TestPickNonServingTablets(t *testing.T) {
assert.True(t, picked3)
}

// TestPickNonLaggingTablets validates that lagging tablets are excluded when the
// ExcludeTabletsWithMaxReplicationLag option is set.
func TestPickNonLaggingTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)
cells := []string{"cell1"}
defaultCell := cells[0]
tabletTypes := "replica"
options := TabletPickerOptions{
ExcludeTabletsWithMaxReplicationLag: lowReplicationLag.Default(),
}
replLag := options.ExcludeTabletsWithMaxReplicationLag + (5 * time.Second)
te := newPickerTestEnv(t, ctx, cells)

// Tablet should not be selected as we only want replicas.
primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, defaultCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Tablet should not be selected as it is lagging.
laggingReplicaTablet := addTabletWithLag(ctx, te, 200, topodatapb.TabletType_REPLICA, defaultCell, true, true, uint32(replLag.Seconds()))
defer deleteTablet(t, te, laggingReplicaTablet)

// Tablet should be selected because it's a non-lagging replica.
nonLaggingReplicaTablet := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, defaultCell, true, true)
defer deleteTablet(t, te, nonLaggingReplicaTablet)

_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, defaultCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

var pickedPrimary, pickedLaggingReplica, pickedNonLaggingReplica int
for i := 0; i < numTestIterations; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
if proto.Equal(tablet, primaryTablet) {
pickedPrimary++
}
if proto.Equal(tablet, laggingReplicaTablet) {
pickedLaggingReplica++
}
if proto.Equal(tablet, nonLaggingReplicaTablet) {
pickedNonLaggingReplica++
}
}
require.Zero(t, pickedPrimary)
require.Zero(t, pickedLaggingReplica)
require.Equal(t, numTestIterations, pickedNonLaggingReplica)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down Expand Up @@ -697,6 +752,10 @@ func newPickerTestEnv(t *testing.T, ctx context.Context, cells []string, extraCe
}

func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool) *topodatapb.Tablet {
return addTabletWithLag(ctx, te, id, tabletType, cell, serving, healthy, 0)
}

func addTabletWithLag(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool, replLagSecs uint32) *topodatapb.Tablet {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Expand Down Expand Up @@ -725,6 +784,7 @@ func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topoda
if healthy {
shr.RealtimeStats.HealthError = ""
}
shr.RealtimeStats.ReplicationLagSeconds = replLagSecs

_ = createFixedHealthConn(tablet, shr)

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
tabletPickerOptions: discovery.TabletPickerOptions{
CellPreference: flags.GetCellPreference(),
TabletOrder: flags.GetTabletOrder(),
// This is NOT configurable via the API because we check the
// discovery.GetLowReplicationLag().Seconds() value in the tablet
// health stream.
ExcludeTabletsWithMaxReplicationLag: discovery.GetLowReplicationLag(),
},
}
return vs.stream(ctx)
Expand Down
Loading