diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index e68f90f34762..b578f6a1f37f 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -1,7 +1,16 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, fmt, future::Future, iter}; +use std::{ + collections::BTreeSet, + fmt, + future::Future, + iter, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use futures::{future, stream, StreamExt}; use linera_base::{ @@ -279,7 +288,8 @@ impl ValidatorNode for GrpcClient { async fn subscribe(&self, chains: Vec) -> Result { let retry_delay = self.retry_delay; let max_retries = self.max_retries; - let mut retry_count = 0; + // Use shared atomic counter so unfold can reset it on successful reconnection. + let retry_count = Arc::new(AtomicU32::new(0)); let subscription_request = SubscriptionRequest { chain_ids: chains.into_iter().map(|chain| chain.into()).collect(), }; @@ -298,17 +308,24 @@ impl ValidatorNode for GrpcClient { // A stream of `Result` that keeps calling // `client.subscribe(request)` endlessly and without delay. + let retry_count_for_unfold = retry_count.clone(); let endlessly_retrying_notification_stream = stream::unfold((), move |()| { let mut client = client.clone(); let subscription_request = subscription_request.clone(); let mut stream = stream.take(); + let retry_count = retry_count_for_unfold.clone(); async move { let stream = if let Some(stream) = stream.take() { future::Either::Right(stream) } else { match client.subscribe(subscription_request.clone()).await { Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))), - Ok(response) => future::Either::Right(response.into_inner()), + Ok(response) => { + // Reset retry count on successful reconnection. + retry_count.store(0, Ordering::Relaxed); + trace!("Successfully reconnected subscription stream"); + future::Either::Right(response.into_inner()) + } } }; Some((stream, ())) @@ -328,15 +345,18 @@ impl ValidatorNode for GrpcClient { }) .take_while(move |result| { let Err(status) = result else { - retry_count = 0; + retry_count.store(0, Ordering::Relaxed); return future::Either::Left(future::ready(true)); }; - if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries { + let current_retry_count = retry_count.load(Ordering::Relaxed); + if !span.in_scope(|| Self::is_retryable(status)) + || current_retry_count >= max_retries + { return future::Either::Left(future::ready(false)); } - let delay = retry_delay.saturating_mul(retry_count); - retry_count += 1; + let delay = retry_delay.saturating_mul(current_retry_count); + retry_count.fetch_add(1, Ordering::Relaxed); future::Either::Right(async move { linera_base::time::timer::sleep(delay).await; true