Skip to content
Open
232 changes: 231 additions & 1 deletion balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpcsync"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
Expand Down Expand Up @@ -910,9 +911,25 @@ func (s) TestDataCachePurging(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// wrappingConnectivityStateSubscriber wraps a grpcsync.Subscriber and forwards
// connectivity state updates to both the delegate and a channel for testing.
type wrappingConnectivityStateSubscriber struct {
delegate grpcsync.Subscriber
connStateCh chan connectivity.State
}

func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) {
w.delegate.OnMessage(msg)
w.connStateCh <- msg.(connectivity.State)
}

// TestControlChannelConnectivityStateMonitoring tests the scenario where the
// control channel goes down and comes back up again and verifies that backoff
// state is reset for cache entries in this scenario.
// state is reset for cache entries in this scenario. It also verifies that:
// - Backoff is NOT reset when the control channel first becomes READY (i.e.,
// the initial CONNECTING → READY transition should not trigger a backoff reset)
// - Backoff is NOT reset for READY → IDLE → READY transitions (benign state changes)
// - Backoff IS reset for READY → TRANSIENT_FAILURE → READY transitions
func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
Expand All @@ -939,6 +956,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
defer func() { defaultBackoffStrategy = origBackoffStrategy }()

// Override the connectivity state subscriber to wrap the original and
// make connectivity state changes visible to the test.
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)}
origConnectivityStateSubscriber := newConnectivityStateSubscriber
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
wrappedSubscriber.delegate = delegate
return wrappedSubscriber
}
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
Expand Down Expand Up @@ -973,6 +1000,29 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// Verify that the control channel moves to READY.
wantStates := []connectivity.State{connectivity.Connecting, connectivity.Ready}
for _, wantState := range wantStates {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState != wantState {
t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
}
case <-ctx.Done():
t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
}
}

// Verify that the initial READY state of the control channel did NOT trigger
// a backoff reset. The resetBackoffHook should only be called when
// transitioning from TRANSIENT_FAILURE to READY, not for the initial
// CONNECTING → READY transition.
select {
case <-resetBackoffDone:
t.Fatal("Backoff reset was triggered for initial READY state, want no reset")
case <-time.After(10 * time.Millisecond):
}

// Stop the RLS server.
lis.Stop()

Expand All @@ -984,6 +1034,21 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// of the test.
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)

// Wait for the control channel to move to TRANSIENT_FAILURE. When the server
// is stopped, we expect the control channel to go through Connecting and
// eventually reach TransientFailure.
transientFailureLoop:
for {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState == connectivity.TransientFailure {
break transientFailureLoop
}
case <-ctx.Done():
t.Fatal("Timeout waiting for RLS control channel to become TRANSIENT_FAILURE")
}
}

// Restart the RLS server.
lis.Restart()

Expand All @@ -1001,6 +1066,20 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)

// Wait for the control channel to move back to READY.
readyAfterTransientFailureLoop:
for {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState == connectivity.Ready {
break readyAfterTransientFailureLoop
}
case <-ctx.Done():
t.Fatal("Timeout waiting for RLS control channel to become READY after TRANSIENT_FAILURE")
}
}

// Verify that backoff was reset when transitioning from TRANSIENT_FAILURE to READY.
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for resetBackoffDone")
Expand All @@ -1016,6 +1095,157 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// TestControlChannelIdleTransitionNoBackoffReset tests that READY → IDLE → READY
// transitions do not trigger backoff resets. This is a benign state change that
// should not affect cache entry backoff state.
func (s) TestControlChannelIdleTransitionNoBackoffReset(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Start an RLS server with the restartable listener and set the throttler to
// never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Override the reset backoff hook to get notified.
resetBackoffCalled := make(chan struct{}, 1)
origResetBackoffHook := resetBackoffHook
resetBackoffHook = func() { resetBackoffCalled <- struct{}{} }
defer func() { resetBackoffHook = origResetBackoffHook }()

// Override the backoff strategy to return a large backoff which
// will make sure the data cache entry remains in backoff for the
// duration of the test.
origBackoffStrategy := defaultBackoffStrategy
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
defer func() { defaultBackoffStrategy = origBackoffStrategy }()

// Override the connectivity state subscriber to wrap the original and
// make connectivity state changes visible to the test.
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)}
origConnectivityStateSubscriber := newConnectivityStateSubscriber
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
wrappedSubscriber.delegate = delegate
return wrappedSubscriber
}
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
t.Logf("Registered child policy with name %q", childPolicyName)

// Build RLS service config with header matchers, and a very low value for
// maxAge to ensure that cache entries become invalid very soon.
rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address)
rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)

// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, _ *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to create gRPC client: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the test backend.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh)

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// Wait for the control channel to move to READY.
for {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState == connectivity.Ready {
goto initialReadyReached
}
case <-ctx.Done():
t.Fatal("Timeout waiting for RLS control channel to become READY")
}
}
initialReadyReached:

// Verify that the initial READY state did NOT trigger a backoff reset.
select {
case <-resetBackoffCalled:
t.Fatal("Backoff reset was triggered for initial READY state, want no reset")
default:
}

// Stop the RLS server to force the control channel to go IDLE. We use Stop()
// which closes connections gracefully, allowing the channel to transition
// to IDLE rather than TRANSIENT_FAILURE.
lis.Stop()

// Wait for the control channel to move to IDLE.
for {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState == connectivity.Idle {
goto idleReached
}
// If we see TRANSIENT_FAILURE before IDLE, that's also acceptable
// for this test - we just need to verify READY->IDLE->READY doesn't
// trigger reset. Skip this iteration if we don't see IDLE.
if gotState == connectivity.TransientFailure {
// This test is specifically for IDLE transitions. If we hit
// TRANSIENT_FAILURE, the other test covers that case.
t.Skip("Control channel went to TRANSIENT_FAILURE instead of IDLE; skipping IDLE transition test")
}
case <-ctx.Done():
t.Fatal("Timeout waiting for RLS control channel to become IDLE")
}
}
idleReached:

// Restart the RLS server.
lis.Restart()

// Make another RPC to trigger reconnection. Use different headers to create
// a new cache entry.
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout}
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)

// Wait for the control channel to move back to READY.
for {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState == connectivity.Ready {
goto readyAfterIdle
}
case <-ctx.Done():
t.Fatal("Timeout waiting for RLS control channel to become READY after IDLE")
}
}
readyAfterIdle:

// Verify that the READY → IDLE → READY transition did NOT trigger a backoff reset.
// This is the key assertion of this test.
select {
case <-resetBackoffCalled:
t.Fatal("Backoff reset was triggered for READY → IDLE → READY transition, want no reset")
default:
// Good - no backoff reset was triggered for this benign transition.
}
}

// testCCWrapper wraps a balancer.ClientConn and overrides UpdateState and
// stores all state updates pushed by the RLS LB policy.
type testCCWrapper struct {
Expand Down
Loading