From 90699a5ce20c46559ee25535e012b9afa7bab86c Mon Sep 17 00:00:00 2001 From: Andre da Silva <2917611+ndr-ds@users.noreply.github.com> Date: Wed, 4 Feb 2026 03:23:08 -0300 Subject: [PATCH] Fix gRPC subscription retry count to reset on reconnection (#5380) ## Motivation Long-lived gRPC subscription streams would silently terminate after accumulating `max_retries` errors over time, even when each individual reconnection was successful. This caused workers to stop receiving notifications and go silent. ## Proposal Change `retry_count` from a local variable to a shared `Arc` that can be reset from within the `unfold` closure when `client.subscribe()` succeeds. Previously, `retry_count` only reset when a successful notification was received. Now it also resets when a reconnection succeeds, so `max_retries` applies to consecutive failed reconnection attempts rather than total errors over the stream's lifetime. ## Test Plan - CI - Manual testing with PM benchmark that was experiencing the issue --- linera-rpc/src/grpc/client.rs | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index e68f90f34762..b578f6a1f37f 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -1,7 +1,16 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, fmt, future::Future, iter}; +use std::{ + collections::BTreeSet, + fmt, + future::Future, + iter, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use futures::{future, stream, StreamExt}; use linera_base::{ @@ -279,7 +288,8 @@ impl ValidatorNode for GrpcClient { async fn subscribe(&self, chains: Vec) -> Result { let retry_delay = self.retry_delay; let max_retries = self.max_retries; - let mut retry_count = 0; + // Use shared atomic counter so unfold can reset it on successful reconnection. + let retry_count = Arc::new(AtomicU32::new(0)); let subscription_request = SubscriptionRequest { chain_ids: chains.into_iter().map(|chain| chain.into()).collect(), }; @@ -298,17 +308,24 @@ impl ValidatorNode for GrpcClient { // A stream of `Result` that keeps calling // `client.subscribe(request)` endlessly and without delay. + let retry_count_for_unfold = retry_count.clone(); let endlessly_retrying_notification_stream = stream::unfold((), move |()| { let mut client = client.clone(); let subscription_request = subscription_request.clone(); let mut stream = stream.take(); + let retry_count = retry_count_for_unfold.clone(); async move { let stream = if let Some(stream) = stream.take() { future::Either::Right(stream) } else { match client.subscribe(subscription_request.clone()).await { Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))), - Ok(response) => future::Either::Right(response.into_inner()), + Ok(response) => { + // Reset retry count on successful reconnection. + retry_count.store(0, Ordering::Relaxed); + trace!("Successfully reconnected subscription stream"); + future::Either::Right(response.into_inner()) + } } }; Some((stream, ())) @@ -328,15 +345,18 @@ impl ValidatorNode for GrpcClient { }) .take_while(move |result| { let Err(status) = result else { - retry_count = 0; + retry_count.store(0, Ordering::Relaxed); return future::Either::Left(future::ready(true)); }; - if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries { + let current_retry_count = retry_count.load(Ordering::Relaxed); + if !span.in_scope(|| Self::is_retryable(status)) + || current_retry_count >= max_retries + { return future::Either::Left(future::ready(false)); } - let delay = retry_delay.saturating_mul(retry_count); - retry_count += 1; + let delay = retry_delay.saturating_mul(current_retry_count); + retry_count.fetch_add(1, Ordering::Relaxed); future::Either::Right(async move { linera_base::time::timer::sleep(delay).await; true