From a6fcb7e0af72182edfe4d14b74587ea257597c37 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Thu, 20 Nov 2025 22:15:13 +0300 Subject: [PATCH 01/13] rls: only reset backoff on recovery from TRANSIENT_FAILURE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix control channel connectivity monitoring to track TRANSIENT_FAILURE state explicitly. Only reset backoff timers when transitioning from TRANSIENT_FAILURE to READY, not for benign state changes like READY → IDLE → READY. Fixes #8693 --- balancer/rls/control_channel.go | 21 ++++++- balancer/rls/control_channel_test.go | 93 ++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 60e6a021d133..5834e56b8237 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -187,6 +187,11 @@ func (cc *controlChannel) monitorConnectivityState() { cc.connectivityStateCh.Load() cc.logger.Infof("Connectivity state is READY") + // Track whether we've seen TRANSIENT_FAILURE since the last READY state. + // We only want to reset backoff when recovering from an actual failure, + // not when transitioning through benign states like IDLE. + seenTransientFailure := false + for { s, ok := <-cc.connectivityStateCh.Get() if !ok { @@ -197,9 +202,21 @@ func (cc *controlChannel) monitorConnectivityState() { if s == connectivity.Shutdown { return } + + // Track if we've entered TRANSIENT_FAILURE state + if s == connectivity.TransientFailure { + seenTransientFailure = true + } + + // Only reset backoff if we're returning to READY after a failure if s == connectivity.Ready { - cc.logger.Infof("Control channel back to READY") - cc.backToReadyFunc() + if seenTransientFailure { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + cc.backToReadyFunc() + seenTransientFailure = false + } else { + cc.logger.Infof("Control channel back to READY (no prior failure)") + } } cc.logger.Infof("Connectivity state is %s", s) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 5a30820c3b47..1933868b7952 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -26,6 +26,7 @@ import ( "fmt" "os" "regexp" + "sync" "testing" "time" @@ -33,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" @@ -463,3 +465,94 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { t.Fatal("newControlChannel succeeded when expected to fail") } } + +// TestControlChannelConnectivityStateTransitions verifies that the control +// channel only resets backoff when recovering from TRANSIENT_FAILURE, not +// when going through benign state changes like READY → IDLE → READY. +func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { + tests := []struct { + name string + states []connectivity.State + wantCallbackCount int + }{ + { + name: "READY → TRANSIENT_FAILURE → READY triggers callback", + states: []connectivity.State{ + connectivity.TransientFailure, + connectivity.Ready, + }, + wantCallbackCount: 1, + }, + { + name: "READY → IDLE → READY does not trigger callback", + states: []connectivity.State{ + connectivity.Idle, + connectivity.Ready, + }, + wantCallbackCount: 0, + }, + { + name: "Multiple failures trigger callback each time", + states: []connectivity.State{ + connectivity.TransientFailure, + connectivity.Ready, + connectivity.TransientFailure, + connectivity.Ready, + }, + wantCallbackCount: 2, + }, + { + name: "IDLE between failures doesn't affect callback", + states: []connectivity.State{ + connectivity.TransientFailure, + connectivity.Idle, + connectivity.Ready, + }, + wantCallbackCount: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Start an RLS server + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) + + // Setup callback to count invocations + callbackCount := 0 + var mu sync.Mutex + callback := func() { + mu.Lock() + callbackCount++ + mu.Unlock() + } + + // Create control channel + ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) + if err != nil { + t.Fatalf("Failed to create control channel: %v", err) + } + defer ctrlCh.close() + + // Give the channel time to reach initial READY state + time.Sleep(100 * time.Millisecond) + + // Inject the test state sequence + for _, state := range tt.states { + ctrlCh.OnMessage(state) + // Give time for the monitoring goroutine to process the state + time.Sleep(50 * time.Millisecond) + } + + // Give extra time for any pending callbacks + time.Sleep(100 * time.Millisecond) + + mu.Lock() + gotCallbackCount := callbackCount + mu.Unlock() + + if gotCallbackCount != tt.wantCallbackCount { + t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount) + } + }) + } +} From ca49ae7c502d2c4b71588030bd4509117bcffa19 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 12:13:00 +0300 Subject: [PATCH 02/13] address reviews --- balancer/rls/control_channel.go | 4 ++- balancer/rls/control_channel_test.go | 43 +++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 5834e56b8237..3f39ad74eacd 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -208,7 +208,9 @@ func (cc *controlChannel) monitorConnectivityState() { seenTransientFailure = true } - // Only reset backoff if we're returning to READY after a failure + // Only reset backoff when recovering from TRANSIENT_FAILURE to READY. + // This prevents unnecessary backoff resets for benign state transitions + // like READY → IDLE → READY, which don't represent actual failures. if s == connectivity.Ready { if seenTransientFailure { cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 1933868b7952..6e0d39cd1b75 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -517,13 +517,15 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { // Start an RLS server rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) - // Setup callback to count invocations + // Setup callback to count invocations with synchronization callbackCount := 0 var mu sync.Mutex + callbackInvoked := make(chan struct{}, 10) callback := func() { mu.Lock() callbackCount++ mu.Unlock() + callbackInvoked <- struct{}{} } // Create control channel @@ -533,18 +535,45 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Give the channel time to reach initial READY state - time.Sleep(100 * time.Millisecond) + // Wait for initial READY state by checking connectivity state buffer + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + initialReady := false + for !initialReady { + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial READY state") + default: + if ctrlCh.cc.GetState() == connectivity.Ready { + initialReady = true + } else { + time.Sleep(10 * time.Millisecond) + } + } + } // Inject the test state sequence for _, state := range tt.states { ctrlCh.OnMessage(state) - // Give time for the monitoring goroutine to process the state - time.Sleep(50 * time.Millisecond) } - // Give extra time for any pending callbacks - time.Sleep(100 * time.Millisecond) + // Wait for expected callbacks to be invoked + for i := 0; i < tt.wantCallbackCount; i++ { + select { + case <-callbackInvoked: + // Callback received as expected + case <-time.After(defaultTestTimeout): + t.Fatalf("Timeout waiting for callback %d/%d", i+1, tt.wantCallbackCount) + } + } + + // Ensure no extra callbacks are invoked + select { + case <-callbackInvoked: + t.Fatal("Received more callbacks than expected") + case <-time.After(100 * time.Millisecond): + // Expected: no more callbacks + } mu.Lock() gotCallbackCount := callbackCount From c7eb61886ca60b7ab4afd548f97c603c8314e6c3 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 12:44:27 +0300 Subject: [PATCH 03/13] address reviews --- balancer/rls/control_channel.go | 16 ++++- balancer/rls/control_channel_test.go | 93 ++++++++++++++++++---------- 2 files changed, 75 insertions(+), 34 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 3f39ad74eacd..6e9304aea97d 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -209,12 +209,22 @@ func (cc *controlChannel) monitorConnectivityState() { } // Only reset backoff when recovering from TRANSIENT_FAILURE to READY. - // This prevents unnecessary backoff resets for benign state transitions - // like READY → IDLE → READY, which don't represent actual failures. + // When the control channel enters TRANSIENT_FAILURE, it indicates the RLS + // server is unreachable or experiencing issues. When it then transitions to + // READY, we reset the backoff state in all cache entries to allow pending + // RPCs to proceed immediately, rather than waiting for their individual + // backoff timers to expire. + // + // We skip resetting backoff for benign state transitions like READY → IDLE + // → READY (which occur during normal operation due to connection idleness) + // because these don't represent actual failures that would justify clearing + // the backoff state. if s == connectivity.Ready { if seenTransientFailure { cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") - cc.backToReadyFunc() + if cc.backToReadyFunc != nil { + cc.backToReadyFunc() + } seenTransientFailure = false } else { cc.logger.Infof("Control channel back to READY (no prior failure)") diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 6e0d39cd1b75..90a3a124af2a 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -517,15 +517,20 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { // Start an RLS server rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) - // Setup callback to count invocations with synchronization - callbackCount := 0 + // Setup callback to count invocations var mu sync.Mutex - callbackInvoked := make(chan struct{}, 10) + var callbackCount int + // Buffered channel to collect callback invocations without blocking + callbackInvoked := make(chan struct{}, tt.wantCallbackCount+5) callback := func() { mu.Lock() callbackCount++ mu.Unlock() - callbackInvoked <- struct{}{} + // Non-blocking send - if channel is full, we still counted it + select { + case callbackInvoked <- struct{}{}: + default: + } } // Create control channel @@ -535,46 +540,61 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Wait for initial READY state by checking connectivity state buffer + // Wait for initial READY state using state change notifications ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - initialReady := false - for !initialReady { - select { - case <-ctx.Done(): - t.Fatal("Timeout waiting for initial READY state") - default: - if ctrlCh.cc.GetState() == connectivity.Ready { - initialReady = true - } else { - time.Sleep(10 * time.Millisecond) + + readyCh := make(chan struct{}) + go func() { + for { + state := ctrlCh.cc.GetState() + if state == connectivity.Ready { + close(readyCh) + return + } + if !ctrlCh.cc.WaitForStateChange(ctx, state) { + return } } + }() + + select { + case <-readyCh: + // Initial READY state achieved + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial READY state") } - // Inject the test state sequence + // Process states sequentially, waiting for callbacks when expected + seenTransientFailure := false + expectedCallbacks := 0 + for _, state := range tt.states { + // Inject the state ctrlCh.OnMessage(state) - } - // Wait for expected callbacks to be invoked - for i := 0; i < tt.wantCallbackCount; i++ { - select { - case <-callbackInvoked: - // Callback received as expected - case <-time.After(defaultTestTimeout): - t.Fatalf("Timeout waiting for callback %d/%d", i+1, tt.wantCallbackCount) + // Track if we're in a failure state + if state == connectivity.TransientFailure { + seenTransientFailure = true } - } - // Ensure no extra callbacks are invoked - select { - case <-callbackInvoked: - t.Fatal("Received more callbacks than expected") - case <-time.After(100 * time.Millisecond): - // Expected: no more callbacks + // If transitioning to READY after a failure, wait for callback + if state == connectivity.Ready && seenTransientFailure { + expectedCallbacks++ + select { + case <-callbackInvoked: + // Callback received as expected + seenTransientFailure = false + case <-time.After(defaultTestTimeout): + mu.Lock() + got := callbackCount + mu.Unlock() + t.Fatalf("Timeout waiting for callback %d/%d after TRANSIENT_FAILURE→READY (got %d callbacks so far)", expectedCallbacks, tt.wantCallbackCount, got) + } + } } + // Verify final callback count matches expected mu.Lock() gotCallbackCount := callbackCount mu.Unlock() @@ -582,6 +602,17 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { if gotCallbackCount != tt.wantCallbackCount { t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount) } + + // Ensure no extra callbacks are invoked + select { + case <-callbackInvoked: + mu.Lock() + final := callbackCount + mu.Unlock() + t.Fatalf("Received more callbacks than expected: got %d, want %d", final, tt.wantCallbackCount) + case <-time.After(50 * time.Millisecond): + // Expected: no more callbacks + } }) } } From 943240dc873dc2cca5c8da8bbc722dc40964ea42 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 12:47:48 +0300 Subject: [PATCH 04/13] Shorter comment --- balancer/rls/control_channel.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 6e9304aea97d..fc33cfd1a31e 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -208,17 +208,11 @@ func (cc *controlChannel) monitorConnectivityState() { seenTransientFailure = true } - // Only reset backoff when recovering from TRANSIENT_FAILURE to READY. - // When the control channel enters TRANSIENT_FAILURE, it indicates the RLS - // server is unreachable or experiencing issues. When it then transitions to - // READY, we reset the backoff state in all cache entries to allow pending - // RPCs to proceed immediately, rather than waiting for their individual - // backoff timers to expire. - // - // We skip resetting backoff for benign state transitions like READY → IDLE - // → READY (which occur during normal operation due to connection idleness) - // because these don't represent actual failures that would justify clearing - // the backoff state. + // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. + // This indicates the RLS server has recovered from being unreachable, so + // we reset backoff state in all cache entries to allow pending RPCs to + // proceed immediately. We skip benign transitions like READY → IDLE → READY + // since those don't represent actual failures. if s == connectivity.Ready { if seenTransientFailure { cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") From ed5ab2c9ae37915866566770ed50f5548ca66913 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 13:19:18 +0300 Subject: [PATCH 05/13] Fix test synchronization and remove time.Sleep dependencies - Add testOnlyInitialReadyDone channel for proper test synchronization - Signal when monitoring goroutine processes initial READY state - Tests wait for this signal instead of using time.Sleep - All synchronization now uses channels/callbacks - no arbitrary sleeps - Tests pass consistently with race detector Addresses review feedback about removing time.Sleep for state transitions. --- balancer/rls/control_channel.go | 17 ++++-- balancer/rls/control_channel_test.go | 77 ++++++++++------------------ 2 files changed, 38 insertions(+), 56 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index fc33cfd1a31e..e27a9d4b5d25 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -63,6 +63,9 @@ type controlChannel struct { connectivityStateCh *buffer.Unbounded unsubscribe func() monitorDoneCh chan struct{} + // testOnlyInitialReadyDone is closed when the monitoring goroutine + // processes the initial READY state. Only used in tests. + testOnlyInitialReadyDone chan struct{} } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -70,11 +73,12 @@ type controlChannel struct { // gRPC channel. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyFunc: backToReadyFunc, - throttler: newAdaptiveThrottler(), - connectivityStateCh: buffer.NewUnbounded(), - monitorDoneCh: make(chan struct{}), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), + connectivityStateCh: buffer.NewUnbounded(), + monitorDoneCh: make(chan struct{}), + testOnlyInitialReadyDone: make(chan struct{}), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -187,6 +191,9 @@ func (cc *controlChannel) monitorConnectivityState() { cc.connectivityStateCh.Load() cc.logger.Infof("Connectivity state is READY") + // Signal tests that initial READY has been processed + close(cc.testOnlyInitialReadyDone) + // Track whether we've seen TRANSIENT_FAILURE since the last READY state. // We only want to reset backoff when recovering from an actual failure, // not when transitioning through benign states like IDLE. diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 90a3a124af2a..2966cf086955 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -520,17 +520,14 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { // Setup callback to count invocations var mu sync.Mutex var callbackCount int - // Buffered channel to collect callback invocations without blocking - callbackInvoked := make(chan struct{}, tt.wantCallbackCount+5) + // Buffered channel large enough to never block + callbackInvoked := make(chan struct{}, 100) callback := func() { mu.Lock() callbackCount++ mu.Unlock() - // Non-blocking send - if channel is full, we still counted it - select { - case callbackInvoked <- struct{}{}: - default: - } + // Send to channel - should never block with large buffer + callbackInvoked <- struct{}{} } // Create control channel @@ -540,57 +537,35 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Wait for initial READY state using state change notifications - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - readyCh := make(chan struct{}) - go func() { - for { - state := ctrlCh.cc.GetState() - if state == connectivity.Ready { - close(readyCh) - return - } - if !ctrlCh.cc.WaitForStateChange(ctx, state) { - return - } - } - }() - + // Wait for the monitoring goroutine to process the initial READY state + // before injecting test states. This ensures our injected states are + // processed in the main monitoring loop, not consumed during initialization. select { - case <-readyCh: - // Initial READY state achieved - case <-ctx.Done(): - t.Fatal("Timeout waiting for initial READY state") + case <-ctrlCh.testOnlyInitialReadyDone: + // Initial READY processed by monitoring goroutine + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for monitoring goroutine to process initial READY state") } - // Process states sequentially, waiting for callbacks when expected - seenTransientFailure := false - expectedCallbacks := 0 - + // Inject all test states for _, state := range tt.states { - // Inject the state ctrlCh.OnMessage(state) + } - // Track if we're in a failure state - if state == connectivity.TransientFailure { - seenTransientFailure = true - } + // Wait for all expected callbacks with timeout + callbackTimeout := time.NewTimer(defaultTestTimeout) + defer callbackTimeout.Stop() - // If transitioning to READY after a failure, wait for callback - if state == connectivity.Ready && seenTransientFailure { - expectedCallbacks++ - select { - case <-callbackInvoked: - // Callback received as expected - seenTransientFailure = false - case <-time.After(defaultTestTimeout): - mu.Lock() - got := callbackCount - mu.Unlock() - t.Fatalf("Timeout waiting for callback %d/%d after TRANSIENT_FAILURE→READY (got %d callbacks so far)", expectedCallbacks, tt.wantCallbackCount, got) - } + receivedCallbacks := 0 + for receivedCallbacks < tt.wantCallbackCount { + select { + case <-callbackInvoked: + receivedCallbacks++ + case <-callbackTimeout.C: + mu.Lock() + got := callbackCount + mu.Unlock() + t.Fatalf("Timeout waiting for callbacks: expected %d, received %d via channel, callback count is %d", tt.wantCallbackCount, receivedCallbacks, got) } } From 2ad8249a8975f1ab1bc5b9b8f84ffa4fabf5eec0 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Sun, 23 Nov 2025 14:11:11 +0300 Subject: [PATCH 06/13] Fix comments --- balancer/rls/control_channel.go | 142 +++++++-------------------- balancer/rls/control_channel_test.go | 10 -- 2 files changed, 38 insertions(+), 114 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index e27a9d4b5d25..de28fc3e9b4e 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -21,6 +21,7 @@ package rls import ( "context" "fmt" + "sync" "time" "google.golang.org/grpc" @@ -29,7 +30,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/buffer" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -57,15 +57,12 @@ type controlChannel struct { // hammering the RLS service while it is overloaded or down. throttler adaptiveThrottler - cc *grpc.ClientConn - client rlsgrpc.RouteLookupServiceClient - logger *internalgrpclog.PrefixLogger - connectivityStateCh *buffer.Unbounded - unsubscribe func() - monitorDoneCh chan struct{} - // testOnlyInitialReadyDone is closed when the monitoring goroutine - // processes the initial READY state. Only used in tests. - testOnlyInitialReadyDone chan struct{} + cc *grpc.ClientConn + client rlsgrpc.RouteLookupServiceClient + logger *internalgrpclog.PrefixLogger + unsubscribe func() + seenTransientFailure bool + mu sync.Mutex } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -73,12 +70,9 @@ type controlChannel struct { // gRPC channel. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyFunc: backToReadyFunc, - throttler: newAdaptiveThrottler(), - connectivityStateCh: buffer.NewUnbounded(), - monitorDoneCh: make(chan struct{}), - testOnlyInitialReadyDone: make(chan struct{}), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -96,7 +90,6 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) - go ctrlCh.monitorConnectivityState() return ctrlCh, nil } @@ -105,7 +98,34 @@ func (cc *controlChannel) OnMessage(msg any) { if !ok { panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg)) } - cc.connectivityStateCh.Put(st) + + cc.mu.Lock() + defer cc.mu.Unlock() + + switch st { + case connectivity.Ready: + // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. + // This indicates the RLS server has recovered from being unreachable, so + // we reset backoff state in all cache entries to allow pending RPCs to + // proceed immediately. We skip benign transitions like READY → IDLE → READY + // since those don't represent actual failures. + if cc.seenTransientFailure { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + cc.seenTransientFailure = false + if cc.backToReadyFunc != nil { + cc.backToReadyFunc() + } + } else { + cc.logger.Infof("Control channel is READY") + } + case connectivity.TransientFailure: + // Track that we've entered TRANSIENT_FAILURE state so we know to reset + // backoffs when we recover to READY. + cc.logger.Infof("Control channel is TRANSIENT_FAILURE") + cc.seenTransientFailure = true + default: + cc.logger.Infof("Control channel connectivity state is %s", st) + } } // dialOpts constructs the dial options for the control plane channel. @@ -152,94 +172,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st return dopts, nil } -func (cc *controlChannel) monitorConnectivityState() { - cc.logger.Infof("Starting connectivity state monitoring goroutine") - defer close(cc.monitorDoneCh) - - // Since we use two mechanisms to deal with RLS server being down: - // - adaptive throttling for the channel as a whole - // - exponential backoff on a per-request basis - // we need a way to avoid double-penalizing requests by counting failures - // toward both mechanisms when the RLS server is unreachable. - // - // To accomplish this, we monitor the state of the control plane channel. If - // the state has been TRANSIENT_FAILURE since the last time it was in state - // READY, and it then transitions into state READY, we push on a channel - // which is being read by the LB policy. - // - // The LB the policy will iterate through the cache to reset the backoff - // timeouts in all cache entries. Specifically, this means that it will - // reset the backoff state and cancel the pending backoff timer. Note that - // when cancelling the backoff timer, just like when the backoff timer fires - // normally, a new picker is returned to the channel, to force it to - // re-process any wait-for-ready RPCs that may still be queued if we failed - // them while we were in backoff. However, we should optimize this case by - // returning only one new picker, regardless of how many backoff timers are - // cancelled. - - // Wait for the control channel to become READY for the first time. - for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() { - if !ok { - return - } - - cc.connectivityStateCh.Load() - if s == connectivity.Shutdown { - return - } - } - cc.connectivityStateCh.Load() - cc.logger.Infof("Connectivity state is READY") - - // Signal tests that initial READY has been processed - close(cc.testOnlyInitialReadyDone) - - // Track whether we've seen TRANSIENT_FAILURE since the last READY state. - // We only want to reset backoff when recovering from an actual failure, - // not when transitioning through benign states like IDLE. - seenTransientFailure := false - - for { - s, ok := <-cc.connectivityStateCh.Get() - if !ok { - return - } - cc.connectivityStateCh.Load() - - if s == connectivity.Shutdown { - return - } - - // Track if we've entered TRANSIENT_FAILURE state - if s == connectivity.TransientFailure { - seenTransientFailure = true - } - - // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. - // This indicates the RLS server has recovered from being unreachable, so - // we reset backoff state in all cache entries to allow pending RPCs to - // proceed immediately. We skip benign transitions like READY → IDLE → READY - // since those don't represent actual failures. - if s == connectivity.Ready { - if seenTransientFailure { - cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") - if cc.backToReadyFunc != nil { - cc.backToReadyFunc() - } - seenTransientFailure = false - } else { - cc.logger.Infof("Control channel back to READY (no prior failure)") - } - } - - cc.logger.Infof("Connectivity state is %s", s) - } -} - func (cc *controlChannel) close() { cc.unsubscribe() - cc.connectivityStateCh.Close() - <-cc.monitorDoneCh cc.cc.Close() cc.logger.Infof("Shutdown") } diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 2966cf086955..dbe41db893a1 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -537,16 +537,6 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Wait for the monitoring goroutine to process the initial READY state - // before injecting test states. This ensures our injected states are - // processed in the main monitoring loop, not consumed during initialization. - select { - case <-ctrlCh.testOnlyInitialReadyDone: - // Initial READY processed by monitoring goroutine - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout waiting for monitoring goroutine to process initial READY state") - } - // Inject all test states for _, state := range tt.states { ctrlCh.OnMessage(state) From faee5a0231fc3b656162103232e0306915aa6494 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Mon, 24 Nov 2025 20:39:43 +0300 Subject: [PATCH 07/13] address reviews --- balancer/rls/control_channel.go | 2 +- balancer/rls/control_channel_test.go | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index de28fc3e9b4e..ca98a0cfc9d0 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -121,7 +121,7 @@ func (cc *controlChannel) OnMessage(msg any) { case connectivity.TransientFailure: // Track that we've entered TRANSIENT_FAILURE state so we know to reset // backoffs when we recover to READY. - cc.logger.Infof("Control channel is TRANSIENT_FAILURE") + cc.logger.Warningf("Control channel is TRANSIENT_FAILURE") cc.seenTransientFailure = true default: cc.logger.Infof("Control channel connectivity state is %s", st) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index dbe41db893a1..9f6084bc7319 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -472,28 +472,32 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { tests := []struct { name string + description string states []connectivity.State wantCallbackCount int }{ { - name: "READY → TRANSIENT_FAILURE → READY triggers callback", - states: []connectivity.State{ + name: "ready_after_transient_failure", + description: "ready after transient failure triggers callback to reset the timer.", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, }, wantCallbackCount: 1, }, { - name: "READY → IDLE → READY does not trigger callback", - states: []connectivity.State{ + name: "ready_after_idle", + description: "ready after idle does not trigger callback", + states: []connectivity.State{ connectivity.Idle, connectivity.Ready, }, wantCallbackCount: 0, }, { - name: "Multiple failures trigger callback each time", - states: []connectivity.State{ + name: "multiple_failures", + description: "multiple failures trigger callback each time", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, connectivity.TransientFailure, @@ -502,8 +506,9 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { wantCallbackCount: 2, }, { - name: "IDLE between failures doesn't affect callback", - states: []connectivity.State{ + name: "idle_between_failures", + description: "idle between failures doesn't affect callback", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Idle, connectivity.Ready, From 5dcc02c8832f4f536d20b26f998d9f26f31617c9 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Mon, 24 Nov 2025 20:52:32 +0300 Subject: [PATCH 08/13] gofmt --- balancer/rls/control_channel_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 9f6084bc7319..e5b61d4ac0d1 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -477,27 +477,27 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { wantCallbackCount int }{ { - name: "ready_after_transient_failure", - description: "ready after transient failure triggers callback to reset the timer.", - states: []connectivity.State{ + name: "ready_after_transient_failure", + description: "ready after transient failure triggers callback to reset the timer.", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, }, wantCallbackCount: 1, }, { - name: "ready_after_idle", - description: "ready after idle does not trigger callback", - states: []connectivity.State{ + name: "ready_after_idle", + description: "ready after idle does not trigger callback", + states: []connectivity.State{ connectivity.Idle, connectivity.Ready, }, wantCallbackCount: 0, }, { - name: "multiple_failures", - description: "multiple failures trigger callback each time", - states: []connectivity.State{ + name: "multiple_failures", + description: "multiple failures trigger callback each time", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, connectivity.TransientFailure, @@ -506,9 +506,9 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { wantCallbackCount: 2, }, { - name: "idle_between_failures", - description: "idle between failures doesn't affect callback", - states: []connectivity.State{ + name: "idle_between_failures", + description: "idle between failures doesn't affect callback", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Idle, connectivity.Ready, From de73b31a8bab62995aac98660f81ce812aa15b42 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Thu, 27 Nov 2025 10:10:44 +0300 Subject: [PATCH 09/13] address reviews --- balancer/rls/control_channel.go | 36 ++- balancer/rls/control_channel_test.go | 343 +++++++++++++++++++-------- 2 files changed, 266 insertions(+), 113 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index ca98a0cfc9d0..e8e0c980525a 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -44,6 +44,16 @@ type adaptiveThrottler interface { RegisterBackendResponse(throttled bool) } +// newConnectivityStateSubscriber is a variable that can be overridden in tests +// to wrap the connectivity state subscriber for testing purposes. +var newConnectivityStateSubscriber = connStateSubscriber + +// connStateSubscriber returns the subscriber as-is. This function can be +// overridden in tests to wrap the subscriber. +func connStateSubscriber(sub grpcsync.Subscriber) grpcsync.Subscriber { + return sub +} + // controlChannel is a wrapper around the gRPC channel to the RLS server // specified in the service config. type controlChannel struct { @@ -57,12 +67,14 @@ type controlChannel struct { // hammering the RLS service while it is overloaded or down. throttler adaptiveThrottler - cc *grpc.ClientConn - client rlsgrpc.RouteLookupServiceClient - logger *internalgrpclog.PrefixLogger - unsubscribe func() - seenTransientFailure bool + cc *grpc.ClientConn + client rlsgrpc.RouteLookupServiceClient + logger *internalgrpclog.PrefixLogger + unsubscribe func() + + // All fields below are guarded by mu. mu sync.Mutex + seenTransientFailure bool } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -86,7 +98,7 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura } // Subscribe to connectivity state before connecting to avoid missing initial // updates, which are only delivered to active subscribers. - ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, ctrlCh) + ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh)) ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) @@ -110,13 +122,17 @@ func (cc *controlChannel) OnMessage(msg any) { // proceed immediately. We skip benign transitions like READY → IDLE → READY // since those don't represent actual failures. if cc.seenTransientFailure { - cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + if cc.logger.V(2) { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + } cc.seenTransientFailure = false if cc.backToReadyFunc != nil { cc.backToReadyFunc() } } else { - cc.logger.Infof("Control channel is READY") + if cc.logger.V(2) { + cc.logger.Infof("Control channel is READY") + } } case connectivity.TransientFailure: // Track that we've entered TRANSIENT_FAILURE state so we know to reset @@ -124,7 +140,9 @@ func (cc *controlChannel) OnMessage(msg any) { cc.logger.Warningf("Control channel is TRANSIENT_FAILURE") cc.seenTransientFailure = true default: - cc.logger.Infof("Control channel connectivity state is %s", st) + if cc.logger.V(2) { + cc.logger.Infof("Control channel connectivity state is %s", st) + } } } diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index e5b61d4ac0d1..acd278279d30 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -37,7 +37,9 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpcsync" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" + "google.golang.org/grpc/internal/testutils" rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -466,123 +468,256 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { } } -// TestControlChannelConnectivityStateTransitions verifies that the control -// channel only resets backoff when recovering from TRANSIENT_FAILURE, not -// when going through benign state changes like READY → IDLE → READY. -func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { - tests := []struct { - name string - description string - states []connectivity.State - wantCallbackCount int - }{ - { - name: "ready_after_transient_failure", - description: "ready after transient failure triggers callback to reset the timer.", - states: []connectivity.State{ - connectivity.TransientFailure, - connectivity.Ready, - }, - wantCallbackCount: 1, - }, - { - name: "ready_after_idle", - description: "ready after idle does not trigger callback", - states: []connectivity.State{ - connectivity.Idle, - connectivity.Ready, - }, - wantCallbackCount: 0, - }, - { - name: "multiple_failures", - description: "multiple failures trigger callback each time", - states: []connectivity.State{ - connectivity.TransientFailure, - connectivity.Ready, - connectivity.TransientFailure, - connectivity.Ready, - }, - wantCallbackCount: 2, - }, - { - name: "idle_between_failures", - description: "idle between failures doesn't affect callback", - states: []connectivity.State{ - connectivity.TransientFailure, - connectivity.Idle, - connectivity.Ready, - }, - wantCallbackCount: 1, - }, +// wrappingConnectivityStateSubscriber wraps a connectivity state subscriber +// and exposes state changes to tests via a channel. +type wrappingConnectivityStateSubscriber struct { + delegate grpcsync.Subscriber + connStateCh chan connectivity.State +} + +func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) { + w.delegate.OnMessage(msg) + w.connStateCh <- msg.(connectivity.State) +} + +// TestControlChannelConnectivityStateTransitions_TransientFailure verifies that +// the control channel resets backoff when recovering from TRANSIENT_FAILURE. +// It stops the RLS server to trigger TRANSIENT_FAILURE, then restarts it and +// verifies that backoff is reset when the channel becomes READY again. +func (s) TestControlChannelConnectivityStateTransitions_TransientFailure(t *testing.T) { + // Create a restartable listener for the RLS server. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Start an RLS server with the restartable listener. + rlsServer, _ := rlstest.SetupFakeRLSServer(t, lis) + + // Override the connectivity state subscriber to wrap it for testing. + wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} + origConnectivityStateSubscriber := newConnectivityStateSubscriber + newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { + wrappedSubscriber.delegate = delegate + return wrappedSubscriber + } + defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() + + // Setup callback to track invocations. + var mu sync.Mutex + var callbackCount int + callback := func() { + mu.Lock() + callbackCount++ + mu.Unlock() + } + + // Create control channel. + ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) + if err != nil { + t.Fatalf("Failed to create control channel: %v", err) + } + defer ctrlCh.close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Verify that the control channel moves to READY. + wantStates := []connectivity.State{ + connectivity.Connecting, + connectivity.Ready, + } + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) + } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } + + // Verify no callbacks have been invoked yet (initial READY doesn't trigger callback). + mu.Lock() + if callbackCount != 0 { + mu.Unlock() + t.Fatalf("Got %d callback invocations for initial READY, want 0", callbackCount) } + mu.Unlock() - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Start an RLS server - rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) - - // Setup callback to count invocations - var mu sync.Mutex - var callbackCount int - // Buffered channel large enough to never block - callbackInvoked := make(chan struct{}, 100) - callback := func() { - mu.Lock() - callbackCount++ - mu.Unlock() - // Send to channel - should never block with large buffer - callbackInvoked <- struct{}{} + // Stop the RLS server to trigger TRANSIENT_FAILURE. + lis.Stop() + + // Verify that the control channel moves to IDLE. + wantStates = []connectivity.State{ + connectivity.Idle, + } + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } + + // Trigger a reconnection attempt by making a lookup (which will fail). + // This should cause the channel to attempt to reconnect and move to TRANSIENT_FAILURE. + ctrlCh.lookup(nil, rlspb.RouteLookupRequest_REASON_MISS, "", func(_ []string, _ string, _ error) {}) - // Create control channel - ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) - if err != nil { - t.Fatalf("Failed to create control channel: %v", err) + // Verify that the control channel moves to TRANSIENT_FAILURE. + wantStates = []connectivity.State{ + connectivity.Connecting, + connectivity.TransientFailure, + } + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) } - defer ctrlCh.close() + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } - // Inject all test states - for _, state := range tt.states { - ctrlCh.OnMessage(state) + // Restart the RLS server. + lis.Restart() + + // The control channel should eventually reconnect and move to READY. + // This transition from TRANSIENT_FAILURE → READY should trigger the callback. + // We drain states until we see READY, as the channel may go through intermediate + // states (CONNECTING) very quickly after restart. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto ready } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become READY") + } + } +ready: + + // Verify that the callback was invoked exactly once (for TRANSIENT_FAILURE → READY). + mu.Lock() + got := callbackCount + mu.Unlock() + if got != 1 { + t.Fatalf("Got %d callback invocations, want 1", got) + } +} + +// TestControlChannelConnectivityStateTransitions_IdleDoesNotTriggerCallback +// verifies that IDLE → READY transitions do not trigger backoff reset callbacks. +func (s) TestControlChannelConnectivityStateTransitions_IdleDoesNotTriggerCallback(t *testing.T) { + // Create a restartable listener for the RLS server. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Start an RLS server with the restartable listener. + rlsServer, _ := rlstest.SetupFakeRLSServer(t, lis) + + // Override the connectivity state subscriber to wrap it for testing. + wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} + origConnectivityStateSubscriber := newConnectivityStateSubscriber + newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { + wrappedSubscriber.delegate = delegate + return wrappedSubscriber + } + defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() + + // Setup callback to track invocations. + var mu sync.Mutex + var callbackCount int + callback := func() { + mu.Lock() + callbackCount++ + mu.Unlock() + } - // Wait for all expected callbacks with timeout - callbackTimeout := time.NewTimer(defaultTestTimeout) - defer callbackTimeout.Stop() - - receivedCallbacks := 0 - for receivedCallbacks < tt.wantCallbackCount { - select { - case <-callbackInvoked: - receivedCallbacks++ - case <-callbackTimeout.C: - mu.Lock() - got := callbackCount - mu.Unlock() - t.Fatalf("Timeout waiting for callbacks: expected %d, received %d via channel, callback count is %d", tt.wantCallbackCount, receivedCallbacks, got) - } + // Create control channel. + ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) + if err != nil { + t.Fatalf("Failed to create control channel: %v", err) + } + defer ctrlCh.close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Verify that the control channel moves to READY. + wantStates := []connectivity.State{ + connectivity.Connecting, + connectivity.Ready, + } + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } - // Verify final callback count matches expected - mu.Lock() - gotCallbackCount := callbackCount - mu.Unlock() + // Stop the RLS server (without triggering TRANSIENT_FAILURE first). + lis.Stop() - if gotCallbackCount != tt.wantCallbackCount { - t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount) + // Verify that the control channel moves to IDLE. + wantStates = []connectivity.State{ + connectivity.Idle, + } + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } - // Ensure no extra callbacks are invoked - select { - case <-callbackInvoked: - mu.Lock() - final := callbackCount - mu.Unlock() - t.Fatalf("Received more callbacks than expected: got %d, want %d", final, tt.wantCallbackCount) - case <-time.After(50 * time.Millisecond): - // Expected: no more callbacks + // Restart the RLS server before the channel goes to TRANSIENT_FAILURE. + lis.Restart() + + // Trigger a reconnection by making a lookup. + ctrlCh.lookup(nil, rlspb.RouteLookupRequest_REASON_MISS, "", func(_ []string, _ string, _ error) {}) + + // The control channel should reconnect and move to READY. + // This transition from IDLE → READY should NOT trigger the callback. + // We drain states until we see READY, as the channel may go through intermediate + // states (CONNECTING) very quickly. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto idleready } - }) + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become READY") + } + } +idleready: + + // Wait a bit to ensure no callback is triggered. + time.Sleep(100 * time.Millisecond) + + // Verify that the callback was never invoked (IDLE → READY doesn't trigger callback). + mu.Lock() + got := callbackCount + mu.Unlock() + if got != 0 { + t.Fatalf("Got %d callback invocations for IDLE → READY, want 0", got) } } From 1cf1ad8a3f3b8e5c5050f68b0b206fc56ea488f0 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Mon, 15 Dec 2025 21:37:47 +0300 Subject: [PATCH 10/13] fix --- balancer/rls/control_channel_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index acd278279d30..5c7621c039f1 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -710,10 +710,9 @@ func (s) TestControlChannelConnectivityStateTransitions_IdleDoesNotTriggerCallba } idleready: - // Wait a bit to ensure no callback is triggered. - time.Sleep(100 * time.Millisecond) - // Verify that the callback was never invoked (IDLE → READY doesn't trigger callback). + // We check immediately after observing the READY state - if the callback was going + // to be invoked, it would have happened during the state transition processing. mu.Lock() got := callbackCount mu.Unlock() From 2eb65c34c68ed9aa710ae90834d873581c15546f Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Thu, 18 Dec 2025 16:14:04 +0300 Subject: [PATCH 11/13] addres easwars's comment --- balancer/rls/balancer_test.go | 14 +- balancer/rls/control_channel.go | 12 +- balancer/rls/control_channel_test.go | 256 --------------------------- 3 files changed, 14 insertions(+), 268 deletions(-) diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index fd53b731defd..fa4373753b5d 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -912,7 +912,9 @@ func (s) TestDataCachePurging(t *testing.T) { // TestControlChannelConnectivityStateMonitoring tests the scenario where the // control channel goes down and comes back up again and verifies that backoff -// state is reset for cache entries in this scenario. +// state is reset for cache entries in this scenario. It also verifies that +// backoff is NOT reset when the control channel first becomes READY (i.e., the +// initial CONNECTING → READY transition should not trigger a backoff reset). func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Create a restartable listener which can close existing connections. l, err := testutils.LocalTCPListener() @@ -973,6 +975,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Make sure an RLS request is sent out. verifyRLSRequest(t, rlsReqCh, true) + // Verify that the initial READY state of the control channel did NOT trigger + // a backoff reset. The resetBackoffHook should only be called when + // transitioning from TRANSIENT_FAILURE to READY, not for the initial + // CONNECTING → READY transition. + select { + case <-resetBackoffDone: + t.Fatal("Backoff reset was triggered for initial READY state, want no reset") + default: + } + // Stop the RLS server. lis.Stop() diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index e8e0c980525a..205534fab0e1 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -44,16 +44,6 @@ type adaptiveThrottler interface { RegisterBackendResponse(throttled bool) } -// newConnectivityStateSubscriber is a variable that can be overridden in tests -// to wrap the connectivity state subscriber for testing purposes. -var newConnectivityStateSubscriber = connStateSubscriber - -// connStateSubscriber returns the subscriber as-is. This function can be -// overridden in tests to wrap the subscriber. -func connStateSubscriber(sub grpcsync.Subscriber) grpcsync.Subscriber { - return sub -} - // controlChannel is a wrapper around the gRPC channel to the RLS server // specified in the service config. type controlChannel struct { @@ -98,7 +88,7 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura } // Subscribe to connectivity state before connecting to avoid missing initial // updates, which are only delivered to active subscribers. - ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh)) + ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, ctrlCh) ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 5c7621c039f1..28558951c420 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -26,7 +26,6 @@ import ( "fmt" "os" "regexp" - "sync" "testing" "time" @@ -34,12 +33,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" - "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/grpcsync" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" - "google.golang.org/grpc/internal/testutils" rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -468,255 +464,3 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { } } -// wrappingConnectivityStateSubscriber wraps a connectivity state subscriber -// and exposes state changes to tests via a channel. -type wrappingConnectivityStateSubscriber struct { - delegate grpcsync.Subscriber - connStateCh chan connectivity.State -} - -func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) { - w.delegate.OnMessage(msg) - w.connStateCh <- msg.(connectivity.State) -} - -// TestControlChannelConnectivityStateTransitions_TransientFailure verifies that -// the control channel resets backoff when recovering from TRANSIENT_FAILURE. -// It stops the RLS server to trigger TRANSIENT_FAILURE, then restarts it and -// verifies that backoff is reset when the channel becomes READY again. -func (s) TestControlChannelConnectivityStateTransitions_TransientFailure(t *testing.T) { - // Create a restartable listener for the RLS server. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("net.Listen() failed: %v", err) - } - lis := testutils.NewRestartableListener(l) - - // Start an RLS server with the restartable listener. - rlsServer, _ := rlstest.SetupFakeRLSServer(t, lis) - - // Override the connectivity state subscriber to wrap it for testing. - wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} - origConnectivityStateSubscriber := newConnectivityStateSubscriber - newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { - wrappedSubscriber.delegate = delegate - return wrappedSubscriber - } - defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() - - // Setup callback to track invocations. - var mu sync.Mutex - var callbackCount int - callback := func() { - mu.Lock() - callbackCount++ - mu.Unlock() - } - - // Create control channel. - ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) - if err != nil { - t.Fatalf("Failed to create control channel: %v", err) - } - defer ctrlCh.close() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Verify that the control channel moves to READY. - wantStates := []connectivity.State{ - connectivity.Connecting, - connectivity.Ready, - } - for _, wantState := range wantStates { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState != wantState { - t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) - } - } - - // Verify no callbacks have been invoked yet (initial READY doesn't trigger callback). - mu.Lock() - if callbackCount != 0 { - mu.Unlock() - t.Fatalf("Got %d callback invocations for initial READY, want 0", callbackCount) - } - mu.Unlock() - - // Stop the RLS server to trigger TRANSIENT_FAILURE. - lis.Stop() - - // Verify that the control channel moves to IDLE. - wantStates = []connectivity.State{ - connectivity.Idle, - } - for _, wantState := range wantStates { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState != wantState { - t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) - } - } - - // Trigger a reconnection attempt by making a lookup (which will fail). - // This should cause the channel to attempt to reconnect and move to TRANSIENT_FAILURE. - ctrlCh.lookup(nil, rlspb.RouteLookupRequest_REASON_MISS, "", func(_ []string, _ string, _ error) {}) - - // Verify that the control channel moves to TRANSIENT_FAILURE. - wantStates = []connectivity.State{ - connectivity.Connecting, - connectivity.TransientFailure, - } - for _, wantState := range wantStates { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState != wantState { - t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) - } - } - - // Restart the RLS server. - lis.Restart() - - // The control channel should eventually reconnect and move to READY. - // This transition from TRANSIENT_FAILURE → READY should trigger the callback. - // We drain states until we see READY, as the channel may go through intermediate - // states (CONNECTING) very quickly after restart. - for { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState == connectivity.Ready { - goto ready - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become READY") - } - } -ready: - - // Verify that the callback was invoked exactly once (for TRANSIENT_FAILURE → READY). - mu.Lock() - got := callbackCount - mu.Unlock() - if got != 1 { - t.Fatalf("Got %d callback invocations, want 1", got) - } -} - -// TestControlChannelConnectivityStateTransitions_IdleDoesNotTriggerCallback -// verifies that IDLE → READY transitions do not trigger backoff reset callbacks. -func (s) TestControlChannelConnectivityStateTransitions_IdleDoesNotTriggerCallback(t *testing.T) { - // Create a restartable listener for the RLS server. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("net.Listen() failed: %v", err) - } - lis := testutils.NewRestartableListener(l) - - // Start an RLS server with the restartable listener. - rlsServer, _ := rlstest.SetupFakeRLSServer(t, lis) - - // Override the connectivity state subscriber to wrap it for testing. - wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} - origConnectivityStateSubscriber := newConnectivityStateSubscriber - newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { - wrappedSubscriber.delegate = delegate - return wrappedSubscriber - } - defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() - - // Setup callback to track invocations. - var mu sync.Mutex - var callbackCount int - callback := func() { - mu.Lock() - callbackCount++ - mu.Unlock() - } - - // Create control channel. - ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) - if err != nil { - t.Fatalf("Failed to create control channel: %v", err) - } - defer ctrlCh.close() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Verify that the control channel moves to READY. - wantStates := []connectivity.State{ - connectivity.Connecting, - connectivity.Ready, - } - for _, wantState := range wantStates { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState != wantState { - t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) - } - } - - // Stop the RLS server (without triggering TRANSIENT_FAILURE first). - lis.Stop() - - // Verify that the control channel moves to IDLE. - wantStates = []connectivity.State{ - connectivity.Idle, - } - for _, wantState := range wantStates { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState != wantState { - t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) - } - } - - // Restart the RLS server before the channel goes to TRANSIENT_FAILURE. - lis.Restart() - - // Trigger a reconnection by making a lookup. - ctrlCh.lookup(nil, rlspb.RouteLookupRequest_REASON_MISS, "", func(_ []string, _ string, _ error) {}) - - // The control channel should reconnect and move to READY. - // This transition from IDLE → READY should NOT trigger the callback. - // We drain states until we see READY, as the channel may go through intermediate - // states (CONNECTING) very quickly. - for { - select { - case gotState := <-wrappedSubscriber.connStateCh: - if gotState == connectivity.Ready { - goto idleready - } - case <-ctx.Done(): - t.Fatalf("Timeout waiting for RLS control channel to become READY") - } - } -idleready: - - // Verify that the callback was never invoked (IDLE → READY doesn't trigger callback). - // We check immediately after observing the READY state - if the callback was going - // to be invoked, it would have happened during the state transition processing. - mu.Lock() - got := callbackCount - mu.Unlock() - if got != 0 { - t.Fatalf("Got %d callback invocations for IDLE → READY, want 0", got) - } -} From d044d4d3fe235287a9df254da6903bd57ad3ae3b Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 19 Dec 2025 11:13:19 +0300 Subject: [PATCH 12/13] fixes --- balancer/rls/balancer_test.go | 224 ++++++++++++++++++++++++++- balancer/rls/control_channel.go | 8 +- balancer/rls/control_channel_test.go | 1 - 3 files changed, 228 insertions(+), 5 deletions(-) diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index fa4373753b5d..572ec2dd5844 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpcsync" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" rlstest "google.golang.org/grpc/internal/testutils/rls" @@ -910,11 +911,25 @@ func (s) TestDataCachePurging(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) } +// wrappingConnectivityStateSubscriber wraps a grpcsync.Subscriber and forwards +// connectivity state updates to both the delegate and a channel for testing. +type wrappingConnectivityStateSubscriber struct { + delegate grpcsync.Subscriber + connStateCh chan connectivity.State +} + +func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) { + w.delegate.OnMessage(msg) + w.connStateCh <- msg.(connectivity.State) +} + // TestControlChannelConnectivityStateMonitoring tests the scenario where the // control channel goes down and comes back up again and verifies that backoff -// state is reset for cache entries in this scenario. It also verifies that -// backoff is NOT reset when the control channel first becomes READY (i.e., the -// initial CONNECTING → READY transition should not trigger a backoff reset). +// state is reset for cache entries in this scenario. It also verifies that: +// - Backoff is NOT reset when the control channel first becomes READY (i.e., +// the initial CONNECTING → READY transition should not trigger a backoff reset) +// - Backoff is NOT reset for READY → IDLE → READY transitions (benign state changes) +// - Backoff IS reset for READY → TRANSIENT_FAILURE → READY transitions func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Create a restartable listener which can close existing connections. l, err := testutils.LocalTCPListener() @@ -941,6 +956,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} defer func() { defaultBackoffStrategy = origBackoffStrategy }() + // Override the connectivity state subscriber to wrap the original and + // make connectivity state changes visible to the test. + wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} + origConnectivityStateSubscriber := newConnectivityStateSubscriber + newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { + wrappedSubscriber.delegate = delegate + return wrappedSubscriber + } + defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() + // Register an LB policy to act as the child policy for RLS LB policy. childPolicyName := "test-child-policy" + t.Name() e2e.RegisterRLSChildPolicy(childPolicyName, nil) @@ -975,6 +1000,19 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Make sure an RLS request is sent out. verifyRLSRequest(t, rlsReqCh, true) + // Verify that the control channel moves to READY. + wantStates := []connectivity.State{connectivity.Connecting, connectivity.Ready} + for _, wantState := range wantStates { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState != wantState { + t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState) + } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState) + } + } + // Verify that the initial READY state of the control channel did NOT trigger // a backoff reset. The resetBackoffHook should only be called when // transitioning from TRANSIENT_FAILURE to READY, not for the initial @@ -996,6 +1034,21 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // of the test. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) + // Wait for the control channel to move to TRANSIENT_FAILURE. When the server + // is stopped, we expect the control channel to go through Connecting and + // eventually reach TransientFailure. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.TransientFailure { + goto transientFailureReached + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become TRANSIENT_FAILURE") + } + } +transientFailureReached: + // Restart the RLS server. lis.Restart() @@ -1013,6 +1066,20 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh) + // Wait for the control channel to move back to READY. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto readyAfterTransientFailure + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become READY after TRANSIENT_FAILURE") + } + } +readyAfterTransientFailure: + + // Verify that backoff was reset when transitioning from TRANSIENT_FAILURE to READY. select { case <-ctx.Done(): t.Fatalf("Timed out waiting for resetBackoffDone") @@ -1028,6 +1095,157 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) } +// TestControlChannelIdleTransitionNoBackoffReset tests that READY → IDLE → READY +// transitions do not trigger backoff resets. This is a benign state change that +// should not affect cache entry backoff state. +func (s) TestControlChannelIdleTransitionNoBackoffReset(t *testing.T) { + // Create a restartable listener which can close existing connections. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Start an RLS server with the restartable listener and set the throttler to + // never throttle requests. + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Override the reset backoff hook to get notified. + resetBackoffCalled := make(chan struct{}, 1) + origResetBackoffHook := resetBackoffHook + resetBackoffHook = func() { resetBackoffCalled <- struct{}{} } + defer func() { resetBackoffHook = origResetBackoffHook }() + + // Override the backoff strategy to return a large backoff which + // will make sure the data cache entry remains in backoff for the + // duration of the test. + origBackoffStrategy := defaultBackoffStrategy + defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} + defer func() { defaultBackoffStrategy = origBackoffStrategy }() + + // Override the connectivity state subscriber to wrap the original and + // make connectivity state changes visible to the test. + wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 10)} + origConnectivityStateSubscriber := newConnectivityStateSubscriber + newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { + wrappedSubscriber.delegate = delegate + return wrappedSubscriber + } + defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() + + // Register an LB policy to act as the child policy for RLS LB policy. + childPolicyName := "test-child-policy" + t.Name() + e2e.RegisterRLSChildPolicy(childPolicyName, nil) + t.Logf("Registered child policy with name %q", childPolicyName) + + // Build RLS service config with header matchers, and a very low value for + // maxAge to ensure that cache entries become invalid very soon. + rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address) + rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout) + + // Start a test backend, and set up the fake RLS server to return this as a + // target in the RLS response. + backendCh, backendAddress := startBackend(t) + rlsServer.SetResponseCallback(func(_ context.Context, _ *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} + }) + + // Register a manual resolver and push the RLS service config through it. + r := startManualResolverWithConfig(t, rlsConfig) + + cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create gRPC client: %v", err) + } + defer cc.Close() + + // Make an RPC and ensure it gets routed to the test backend. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh) + + // Make sure an RLS request is sent out. + verifyRLSRequest(t, rlsReqCh, true) + + // Wait for the control channel to move to READY. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto initialReadyReached + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become READY") + } + } +initialReadyReached: + + // Verify that the initial READY state did NOT trigger a backoff reset. + select { + case <-resetBackoffCalled: + t.Fatal("Backoff reset was triggered for initial READY state, want no reset") + default: + } + + // Stop the RLS server to force the control channel to go IDLE. We use Stop() + // which closes connections gracefully, allowing the channel to transition + // to IDLE rather than TRANSIENT_FAILURE. + lis.Stop() + + // Wait for the control channel to move to IDLE. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Idle { + goto idleReached + } + // If we see TRANSIENT_FAILURE before IDLE, that's also acceptable + // for this test - we just need to verify READY->IDLE->READY doesn't + // trigger reset. Skip this iteration if we don't see IDLE. + if gotState == connectivity.TransientFailure { + // This test is specifically for IDLE transitions. If we hit + // TRANSIENT_FAILURE, the other test covers that case. + t.Skip("Control channel went to TRANSIENT_FAILURE instead of IDLE; skipping IDLE transition test") + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become IDLE") + } + } +idleReached: + + // Restart the RLS server. + lis.Restart() + + // Make another RPC to trigger reconnection. Use different headers to create + // a new cache entry. + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") + defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout} + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh) + + // Wait for the control channel to move back to READY. + for { + select { + case gotState := <-wrappedSubscriber.connStateCh: + if gotState == connectivity.Ready { + goto readyAfterIdle + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for RLS control channel to become READY after IDLE") + } + } +readyAfterIdle: + + // Verify that the READY → IDLE → READY transition did NOT trigger a backoff reset. + // This is the key assertion of this test. + select { + case <-resetBackoffCalled: + t.Fatal("Backoff reset was triggered for READY → IDLE → READY transition, want no reset") + default: + // Good - no backoff reset was triggered for this benign transition. + } +} + // testCCWrapper wraps a balancer.ClientConn and overrides UpdateState and // stores all state updates pushed by the RLS LB policy. type testCCWrapper struct { diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 205534fab0e1..b9dc2e9f7537 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -39,6 +39,12 @@ import ( var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() } +// newConnectivityStateSubscriber is a variable that can be overridden in tests +// to wrap the connectivity state subscriber for testing purposes. +var newConnectivityStateSubscriber = func(sub grpcsync.Subscriber) grpcsync.Subscriber { + return sub +} + type adaptiveThrottler interface { ShouldThrottle() bool RegisterBackendResponse(throttled bool) @@ -88,7 +94,7 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura } // Subscribe to connectivity state before connecting to avoid missing initial // updates, which are only delivered to active subscribers. - ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, ctrlCh) + ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh)) ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 28558951c420..5a30820c3b47 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -463,4 +463,3 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { t.Fatal("newControlChannel succeeded when expected to fail") } } - From af8497ccdab95f553ab845549d85c12dcf7b1b46 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Thu, 15 Jan 2026 19:39:41 +0300 Subject: [PATCH 13/13] address comments --- balancer/rls/balancer_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 572ec2dd5844..b2949ed92824 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -1020,7 +1020,7 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { select { case <-resetBackoffDone: t.Fatal("Backoff reset was triggered for initial READY state, want no reset") - default: + case <-time.After(10 * time.Millisecond): } // Stop the RLS server. @@ -1037,17 +1037,17 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Wait for the control channel to move to TRANSIENT_FAILURE. When the server // is stopped, we expect the control channel to go through Connecting and // eventually reach TransientFailure. +transientFailureLoop: for { select { case gotState := <-wrappedSubscriber.connStateCh: if gotState == connectivity.TransientFailure { - goto transientFailureReached + break transientFailureLoop } case <-ctx.Done(): t.Fatal("Timeout waiting for RLS control channel to become TRANSIENT_FAILURE") } } -transientFailureReached: // Restart the RLS server. lis.Restart() @@ -1067,17 +1067,17 @@ transientFailureReached: makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh) // Wait for the control channel to move back to READY. +readyAfterTransientFailureLoop: for { select { case gotState := <-wrappedSubscriber.connStateCh: if gotState == connectivity.Ready { - goto readyAfterTransientFailure + break readyAfterTransientFailureLoop } case <-ctx.Done(): t.Fatal("Timeout waiting for RLS control channel to become READY after TRANSIENT_FAILURE") } } -readyAfterTransientFailure: // Verify that backoff was reset when transitioning from TRANSIENT_FAILURE to READY. select {