diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index fd53b731defd..b2949ed92824 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpcsync" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" rlstest "google.golang.org/grpc/internal/testutils/rls" @@ -910,9 +911,25 @@ func (s) TestDataCachePurging(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) } +// wrappingConnectivityStateSubscriber wraps a grpcsync.Subscriber and forwards +// connectivity state updates to both the delegate and a channel for testing. +type wrappingConnectivityStateSubscriber struct { + delegate grpcsync.Subscriber + connStateCh chan connectivity.State +} + +func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) { + w.delegate.OnMessage(msg) + w.connStateCh <- msg.(connectivity.State) +} + // TestControlChannelConnectivityStateMonitoring tests the scenario where the // control channel goes down and comes back up again and verifies that backoff -// state is reset for cache entries in this scenario. +// state is reset for cache entries in this scenario. It also verifies that: +// - Backoff is NOT reset when the control channel first becomes READY (i.e., +// the initial CONNECTING → READY transition should not trigger a backoff reset) +// - Backoff is NOT reset for READY → IDLE → READY transitions (benign state changes) +// - Backoff IS reset for READY → TRANSIENT_FAILURE → READY transitions func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Create a restartable listener which can close existing connections. l, err := testutils.LocalTCPListener() @@ -939,6 +956,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} defer func() { defaultBackoffStrategy = origBackoffStrategy }() + // Override the connectivity state subscriber to wrap the original and + // make connectivity state changes visible to the test. + wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} + origConnectivityStateSubscriber := newConnectivityStateSubscriber + newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { + wrappedSubscriber.delegate = delegate + return wrappedSubscriber + } + defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() + // Register an LB policy to act as the child policy for RLS LB policy. childPolicyName := "test-child-policy" + t.Name() e2e.RegisterRLSChildPolicy(childPolicyName, nil) @@ -973,6 +1000,29 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Make sure an RLS request is sent out. verifyRLSRequest(t, rlsReqCh, true) + // Verify that the control channel moves to READY. + wantStates := []connectivity.State{connectivity.Connecting, connectivity.Ready} + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) + } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } + + // Verify that the initial READY state of the control channel did NOT trigger + // a backoff reset. The resetBackoffHook should only be called when + // transitioning from TRANSIENT_FAILURE to READY, not for the initial + // CONNECTING → READY transition. + select { + case <-resetBackoffDone: + t.Fatal("Backoff reset was triggered for initial READY state, want no reset") + case <-time.After(10 * time.Millisecond): + } + // Stop the RLS server. lis.Stop() @@ -984,6 +1034,21 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // of the test. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) + // Wait for the control channel to move to TRANSIENT_FAILURE. When the server + // is stopped, we expect the control channel to go through Connecting and + // eventually reach TransientFailure. +transientFailureLoop: + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.TransientFailure { + break transientFailureLoop + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become TRANSIENT_FAILURE") + } + } + // Restart the RLS server. lis.Restart() @@ -1001,6 +1066,20 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh) + // Wait for the control channel to move back to READY. +readyAfterTransientFailureLoop: + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + break readyAfterTransientFailureLoop + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become READY after TRANSIENT_FAILURE") + } + } + + // Verify that backoff was reset when transitioning from TRANSIENT_FAILURE to READY. select { case <-ctx.Done(): t.Fatalf("Timed out waiting for resetBackoffDone") @@ -1016,6 +1095,157 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) } +// TestControlChannelIdleTransitionNoBackoffReset tests that READY → IDLE → READY +// transitions do not trigger backoff resets. This is a benign state change that +// should not affect cache entry backoff state. +func (s) TestControlChannelIdleTransitionNoBackoffReset(t *testing.T) { + // Create a restartable listener which can close existing connections. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Start an RLS server with the restartable listener and set the throttler to + // never throttle requests. + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Override the reset backoff hook to get notified. + resetBackoffCalled := make(chan struct{}, 1) + origResetBackoffHook := resetBackoffHook + resetBackoffHook = func() { resetBackoffCalled <- struct{}{} } + defer func() { resetBackoffHook = origResetBackoffHook }() + + // Override the backoff strategy to return a large backoff which + // will make sure the data cache entry remains in backoff for the + // duration of the test. + origBackoffStrategy := defaultBackoffStrategy + defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} + defer func() { defaultBackoffStrategy = origBackoffStrategy }() + + // Override the connectivity state subscriber to wrap the original and + // make connectivity state changes visible to the test. + wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} + origConnectivityStateSubscriber := newConnectivityStateSubscriber + newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { + wrappedSubscriber.delegate = delegate + return wrappedSubscriber + } + defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() + + // Register an LB policy to act as the child policy for RLS LB policy. + childPolicyName := "test-child-policy" + t.Name() + e2e.RegisterRLSChildPolicy(childPolicyName, nil) + t.Logf("Registered child policy with name %q", childPolicyName) + + // Build RLS service config with header matchers, and a very low value for + // maxAge to ensure that cache entries become invalid very soon. + rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address) + rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout) + + // Start a test backend, and set up the fake RLS server to return this as a + // target in the RLS response. + backendCh, backendAddress := startBackend(t) + rlsServer.SetResponseCallback(func(_ context.Context, _ *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} + }) + + // Register a manual resolver and push the RLS service config through it. + r := startManualResolverWithConfig(t, rlsConfig) + + cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create gRPC client: %v", err) + } + defer cc.Close() + + // Make an RPC and ensure it gets routed to the test backend. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh) + + // Make sure an RLS request is sent out. + verifyRLSRequest(t, rlsReqCh, true) + + // Wait for the control channel to move to READY. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto initialReadyReached + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become READY") + } + } +initialReadyReached: + + // Verify that the initial READY state did NOT trigger a backoff reset. + select { + case <-resetBackoffCalled: + t.Fatal("Backoff reset was triggered for initial READY state, want no reset") + default: + } + + // Stop the RLS server to force the control channel to go IDLE. We use Stop() + // which closes connections gracefully, allowing the channel to transition + // to IDLE rather than TRANSIENT_FAILURE. + lis.Stop() + + // Wait for the control channel to move to IDLE. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Idle { + goto idleReached + } + // If we see TRANSIENT_FAILURE before IDLE, that's also acceptable + // for this test - we just need to verify READY->IDLE->READY doesn't + // trigger reset. Skip this iteration if we don't see IDLE. + if gotState == connectivity.TransientFailure { + // This test is specifically for IDLE transitions. If we hit + // TRANSIENT_FAILURE, the other test covers that case. + t.Skip("Control channel went to TRANSIENT_FAILURE instead of IDLE; skipping IDLE transition test") + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become IDLE") + } + } +idleReached: + + // Restart the RLS server. + lis.Restart() + + // Make another RPC to trigger reconnection. Use different headers to create + // a new cache entry. + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") + defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout} + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh) + + // Wait for the control channel to move back to READY. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto readyAfterIdle + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become READY after IDLE") + } + } +readyAfterIdle: + + // Verify that the READY → IDLE → READY transition did NOT trigger a backoff reset. + // This is the key assertion of this test. + select { + case <-resetBackoffCalled: + t.Fatal("Backoff reset was triggered for READY → IDLE → READY transition, want no reset") + default: + // Good - no backoff reset was triggered for this benign transition. + } +} + // testCCWrapper wraps a balancer.ClientConn and overrides UpdateState and // stores all state updates pushed by the RLS LB policy. type testCCWrapper struct { diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 60e6a021d133..b9dc2e9f7537 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -21,6 +21,7 @@ package rls import ( "context" "fmt" + "sync" "time" "google.golang.org/grpc" @@ -29,7 +30,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/buffer" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -39,6 +39,12 @@ import ( var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() } +// newConnectivityStateSubscriber is a variable that can be overridden in tests +// to wrap the connectivity state subscriber for testing purposes. +var newConnectivityStateSubscriber = func(sub grpcsync.Subscriber) grpcsync.Subscriber { + return sub +} + type adaptiveThrottler interface { ShouldThrottle() bool RegisterBackendResponse(throttled bool) @@ -57,12 +63,14 @@ type controlChannel struct { // hammering the RLS service while it is overloaded or down. throttler adaptiveThrottler - cc *grpc.ClientConn - client rlsgrpc.RouteLookupServiceClient - logger *internalgrpclog.PrefixLogger - connectivityStateCh *buffer.Unbounded - unsubscribe func() - monitorDoneCh chan struct{} + cc *grpc.ClientConn + client rlsgrpc.RouteLookupServiceClient + logger *internalgrpclog.PrefixLogger + unsubscribe func() + + // All fields below are guarded by mu. + mu sync.Mutex + seenTransientFailure bool } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -70,11 +78,9 @@ type controlChannel struct { // gRPC channel. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyFunc: backToReadyFunc, - throttler: newAdaptiveThrottler(), - connectivityStateCh: buffer.NewUnbounded(), - monitorDoneCh: make(chan struct{}), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -88,11 +94,10 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura } // Subscribe to connectivity state before connecting to avoid missing initial // updates, which are only delivered to active subscribers. - ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, ctrlCh) + ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh)) ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) - go ctrlCh.monitorConnectivityState() return ctrlCh, nil } @@ -101,7 +106,40 @@ func (cc *controlChannel) OnMessage(msg any) { if !ok { panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg)) } - cc.connectivityStateCh.Put(st) + + cc.mu.Lock() + defer cc.mu.Unlock() + + switch st { + case connectivity.Ready: + // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. + // This indicates the RLS server has recovered from being unreachable, so + // we reset backoff state in all cache entries to allow pending RPCs to + // proceed immediately. We skip benign transitions like READY → IDLE → READY + // since those don't represent actual failures. + if cc.seenTransientFailure { + if cc.logger.V(2) { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + } + cc.seenTransientFailure = false + if cc.backToReadyFunc != nil { + cc.backToReadyFunc() + } + } else { + if cc.logger.V(2) { + cc.logger.Infof("Control channel is READY") + } + } + case connectivity.TransientFailure: + // Track that we've entered TRANSIENT_FAILURE state so we know to reset + // backoffs when we recover to READY. + cc.logger.Warningf("Control channel is TRANSIENT_FAILURE") + cc.seenTransientFailure = true + default: + if cc.logger.V(2) { + cc.logger.Infof("Control channel connectivity state is %s", st) + } + } } // dialOpts constructs the dial options for the control plane channel. @@ -148,68 +186,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st return dopts, nil } -func (cc *controlChannel) monitorConnectivityState() { - cc.logger.Infof("Starting connectivity state monitoring goroutine") - defer close(cc.monitorDoneCh) - - // Since we use two mechanisms to deal with RLS server being down: - // - adaptive throttling for the channel as a whole - // - exponential backoff on a per-request basis - // we need a way to avoid double-penalizing requests by counting failures - // toward both mechanisms when the RLS server is unreachable. - // - // To accomplish this, we monitor the state of the control plane channel. If - // the state has been TRANSIENT_FAILURE since the last time it was in state - // READY, and it then transitions into state READY, we push on a channel - // which is being read by the LB policy. - // - // The LB the policy will iterate through the cache to reset the backoff - // timeouts in all cache entries. Specifically, this means that it will - // reset the backoff state and cancel the pending backoff timer. Note that - // when cancelling the backoff timer, just like when the backoff timer fires - // normally, a new picker is returned to the channel, to force it to - // re-process any wait-for-ready RPCs that may still be queued if we failed - // them while we were in backoff. However, we should optimize this case by - // returning only one new picker, regardless of how many backoff timers are - // cancelled. - - // Wait for the control channel to become READY for the first time. - for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() { - if !ok { - return - } - - cc.connectivityStateCh.Load() - if s == connectivity.Shutdown { - return - } - } - cc.connectivityStateCh.Load() - cc.logger.Infof("Connectivity state is READY") - - for { - s, ok := <-cc.connectivityStateCh.Get() - if !ok { - return - } - cc.connectivityStateCh.Load() - - if s == connectivity.Shutdown { - return - } - if s == connectivity.Ready { - cc.logger.Infof("Control channel back to READY") - cc.backToReadyFunc() - } - - cc.logger.Infof("Connectivity state is %s", s) - } -} - func (cc *controlChannel) close() { cc.unsubscribe() - cc.connectivityStateCh.Close() - <-cc.monitorDoneCh cc.cc.Close() cc.logger.Infof("Shutdown") }