Skip to content

Commit 65c6ef8

Browse files
vitess-bot[bot]timvaillancourtGuptaManan100
committed
[release-19.0] vtorc: require topo for Healthy: true in /debug/health (vitessio#17129) (vitessio#17351)
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> Signed-off-by: Manan Gupta <manan@planetscale.com> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Tim Vaillancourt <tim@timvaillancourt.com> Co-authored-by: Manan Gupta <manan@planetscale.com>
1 parent c812bd2 commit 65c6ef8

File tree

8 files changed

+164
-43
lines changed

8 files changed

+164
-43
lines changed

go/test/endtoend/vtorc/api/api_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ func TestAPIEndpoints(t *testing.T) {
4848
status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool {
4949
return code == 0
5050
})
51-
// When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false.
52-
assert.Equal(t, 500, status)
51+
// When VTOrc starts it runs OpenTabletDiscovery(), which triggers a topo-refresh. VTOrc should be healthy and HasDiscovered should be true.
52+
assert.Equal(t, 200, status)
5353
assert.Contains(t, resp, `"Healthy": true,`)
54-
assert.Contains(t, resp, `"DiscoveredOnce": false`)
54+
assert.Contains(t, resp, `"DiscoveredOnce": true`)
5555

5656
// find primary from topo
5757
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)

go/vt/vtorc/logic/keyspace_shard_discovery.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,16 @@ import (
2929
)
3030

3131
// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
32-
func RefreshAllKeyspacesAndShards() {
32+
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
3333
var keyspaces []string
3434
if len(clustersToWatch) == 0 { // all known keyspaces
35-
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
35+
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
3636
defer cancel()
3737
var err error
3838
// Get all the keyspaces
3939
keyspaces, err = ts.GetKeyspaces(ctx)
4040
if err != nil {
41-
log.Error(err)
42-
return
41+
return err
4342
}
4443
} else {
4544
// Parse input and build list of keyspaces
@@ -55,14 +54,14 @@ func RefreshAllKeyspacesAndShards() {
5554
}
5655
if len(keyspaces) == 0 {
5756
log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
58-
return
57+
return nil
5958
}
6059
}
6160

6261
// Sort the list of keyspaces.
6362
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
6463
sort.Strings(keyspaces)
65-
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
64+
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
6665
defer refreshCancel()
6766
var wg sync.WaitGroup
6867
for idx, keyspace := range keyspaces {
@@ -83,6 +82,8 @@ func RefreshAllKeyspacesAndShards() {
8382
}(keyspace)
8483
}
8584
wg.Wait()
85+
86+
return nil
8687
}
8788

8889
// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.

go/vt/vtorc/logic/keyspace_shard_discovery_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
9393
// Set clusters to watch to only watch ks1 and ks3
9494
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
9595
clustersToWatch = onlyKs1and3
96-
RefreshAllKeyspacesAndShards()
96+
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
9797

9898
// Verify that we only have ks1 and ks3 in vtorc's db.
9999
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
@@ -108,7 +108,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
108108
clustersToWatch = nil
109109
// Change the durability policy of ks1
110110
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", "semi_sync")
111-
RefreshAllKeyspacesAndShards()
111+
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
112112

113113
// Verify that all the keyspaces are correctly reloaded
114114
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "")

go/vt/vtorc/logic/tablet_discovery.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"time"
2828

2929
"github.com/spf13/pflag"
30-
3130
"google.golang.org/protobuf/encoding/prototext"
3231
"google.golang.org/protobuf/proto"
3332

@@ -70,30 +69,36 @@ func OpenTabletDiscovery() <-chan time.Time {
7069
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
7170
log.Error(err)
7271
}
72+
// We refresh all information from the topo once before we start the ticks to do
73+
// it on a timer.
74+
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
75+
defer cancel()
76+
if err := refreshAllInformation(ctx); err != nil {
77+
log.Errorf("failed to initialize topo information: %+v", err)
78+
}
7379
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
7480
}
7581

7682
// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
77-
func refreshAllTablets() {
78-
refreshTabletsUsing(func(tabletAlias string) {
83+
func refreshAllTablets(ctx context.Context) error {
84+
return refreshTabletsUsing(ctx, func(tabletAlias string) {
7985
DiscoverInstance(tabletAlias, false /* forceDiscovery */)
8086
}, false /* forceRefresh */)
8187
}
8288

83-
func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
89+
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
8490
if !IsLeaderOrActive() {
85-
return
91+
return nil
8692
}
8793
if len(clustersToWatch) == 0 { // all known clusters
88-
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
94+
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
8995
defer cancel()
9096
cells, err := ts.GetKnownCells(ctx)
9197
if err != nil {
92-
log.Error(err)
93-
return
98+
return err
9499
}
95100

96-
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
101+
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
97102
defer refreshCancel()
98103
var wg sync.WaitGroup
99104
for _, cell := range cells {
@@ -114,7 +119,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
114119
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
115120
} else {
116121
// Assume this is a keyspace and find all shards in keyspace
117-
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
122+
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
118123
defer cancel()
119124
shards, err := ts.GetShardNames(ctx, ks)
120125
if err != nil {
@@ -133,9 +138,9 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
133138
}
134139
if len(keyspaceShards) == 0 {
135140
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
136-
return
141+
return nil
137142
}
138-
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
143+
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
139144
defer refreshCancel()
140145
var wg sync.WaitGroup
141146
for _, ks := range keyspaceShards {
@@ -147,6 +152,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
147152
}
148153
wg.Wait()
149154
}
155+
return nil
150156
}
151157

152158
func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {

go/vt/vtorc/logic/vtorc.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package logic
1818

1919
import (
20+
"context"
2021
"os"
2122
"os/signal"
2223
"sync"
@@ -26,6 +27,7 @@ import (
2627

2728
"github.com/patrickmn/go-cache"
2829
"github.com/sjmudd/stopwatch"
30+
"golang.org/x/sync/errgroup"
2931

3032
"vitess.io/vitess/go/stats"
3133
"vitess.io/vitess/go/vt/log"
@@ -407,27 +409,34 @@ func ContinuousDiscovery() {
407409
}
408410
}()
409411
case <-tabletTopoTick:
410-
// Create a wait group
411-
var wg sync.WaitGroup
412+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(config.Config.TopoInformationRefreshSeconds))
413+
if err := refreshAllInformation(ctx); err != nil {
414+
log.Errorf("failed to refresh topo information: %+v", err)
415+
}
416+
cancel()
417+
}
418+
}
419+
}
412420

413-
// Refresh all keyspace information.
414-
wg.Add(1)
415-
go func() {
416-
defer wg.Done()
417-
RefreshAllKeyspacesAndShards()
418-
}()
421+
// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
422+
func refreshAllInformation(ctx context.Context) error {
423+
// Create an errgroup
424+
eg, ctx := errgroup.WithContext(ctx)
419425

420-
// Refresh all tablets.
421-
wg.Add(1)
422-
go func() {
423-
defer wg.Done()
424-
refreshAllTablets()
425-
}()
426+
// Refresh all keyspace information.
427+
eg.Go(func() error {
428+
return RefreshAllKeyspacesAndShards(ctx)
429+
})
426430

427-
// Wait for both the refreshes to complete
428-
wg.Wait()
429-
// We have completed one discovery cycle in the entirety of it. We should update the process health.
430-
process.FirstDiscoveryCycleComplete.Store(true)
431-
}
431+
// Refresh all tablets.
432+
eg.Go(func() error {
433+
return refreshAllTablets(ctx)
434+
})
435+
436+
// Wait for both the refreshes to complete
437+
err := eg.Wait()
438+
if err == nil {
439+
process.FirstDiscoveryCycleComplete.Store(true)
432440
}
441+
return err
433442
}

go/vt/vtorc/logic/vtorc_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package logic
22

33
import (
4+
"context"
45
"sync/atomic"
56
"testing"
67
"time"
78

89
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
12+
"vitess.io/vitess/go/vt/topo/memorytopo"
13+
"vitess.io/vitess/go/vt/vtorc/db"
14+
"vitess.io/vitess/go/vt/vtorc/process"
915
)
1016

1117
func TestWaitForLocksRelease(t *testing.T) {
@@ -54,3 +60,49 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
5460
waitForLocksRelease()
5561
return time.Since(start)
5662
}
63+
64+
func TestRefreshAllInformation(t *testing.T) {
65+
defer process.ResetLastHealthCheckCache()
66+
67+
// Store the old flags and restore on test completion
68+
oldTs := ts
69+
defer func() {
70+
ts = oldTs
71+
}()
72+
73+
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
74+
defer func() {
75+
db.ClearVTOrcDatabase()
76+
}()
77+
78+
// Verify in the beginning, we have the first DiscoveredOnce field false.
79+
_, err := process.HealthTest()
80+
require.NoError(t, err)
81+
82+
// Create a memory topo-server and create the keyspace and shard records
83+
ts = memorytopo.NewServer(context.Background(), cell1)
84+
_, err = ts.GetOrCreateShard(context.Background(), keyspace, shard)
85+
require.NoError(t, err)
86+
87+
// Test error
88+
ctx, cancel := context.WithCancel(context.Background())
89+
cancel() // cancel context to simulate timeout
90+
require.Error(t, refreshAllInformation(ctx))
91+
require.False(t, process.FirstDiscoveryCycleComplete.Load())
92+
health, err := process.HealthTest()
93+
require.NoError(t, err)
94+
require.False(t, health.DiscoveredOnce)
95+
require.False(t, health.Healthy)
96+
process.ResetLastHealthCheckCache()
97+
98+
// Test success
99+
ctx2, cancel2 := context.WithCancel(context.Background())
100+
defer cancel2()
101+
require.NoError(t, refreshAllInformation(ctx2))
102+
require.True(t, process.FirstDiscoveryCycleComplete.Load())
103+
health, err = process.HealthTest()
104+
require.NoError(t, err)
105+
require.True(t, health.DiscoveredOnce)
106+
require.True(t, health.Healthy)
107+
process.ResetLastHealthCheckCache()
108+
}

go/vt/vtorc/process/health.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ var FirstDiscoveryCycleComplete atomic.Bool
3636

3737
var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second)
3838

39+
func ResetLastHealthCheckCache() { lastHealthCheckCache.Flush() }
40+
3941
type NodeHealth struct {
4042
Hostname string
4143
Token string
@@ -120,8 +122,8 @@ func HealthTest() (health *HealthStatus, err error) {
120122
log.Error(err)
121123
return health, err
122124
}
123-
health.Healthy = healthy
124125
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()
126+
health.Healthy = healthy && health.DiscoveredOnce
125127

126128
if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil {
127129
health.Error = err

go/vt/vtorc/process/health_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2024 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package process
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/require"
23+
_ "modernc.org/sqlite"
24+
)
25+
26+
func TestHealthTest(t *testing.T) {
27+
defer func() {
28+
FirstDiscoveryCycleComplete.Store(false)
29+
ThisNodeHealth = &NodeHealth{}
30+
ResetLastHealthCheckCache()
31+
}()
32+
33+
require.Zero(t, ThisNodeHealth.LastReported)
34+
35+
ThisNodeHealth = &NodeHealth{}
36+
health, err := HealthTest()
37+
require.NoError(t, err)
38+
require.False(t, health.Healthy)
39+
require.False(t, health.DiscoveredOnce)
40+
require.NotZero(t, ThisNodeHealth.LastReported)
41+
ResetLastHealthCheckCache()
42+
43+
ThisNodeHealth = &NodeHealth{}
44+
FirstDiscoveryCycleComplete.Store(true)
45+
health, err = HealthTest()
46+
require.NoError(t, err)
47+
require.True(t, health.Healthy)
48+
require.True(t, health.DiscoveredOnce)
49+
require.NotZero(t, ThisNodeHealth.LastReported)
50+
ResetLastHealthCheckCache()
51+
}

0 commit comments

Comments
 (0)