-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Fix gRPC subscription retry count to reset on reconnection (frontport of #5380) #5402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,16 @@ | ||
| // Copyright (c) Zefchain Labs, Inc. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| use std::{collections::BTreeSet, fmt, future::Future, iter}; | ||
| use std::{ | ||
| collections::BTreeSet, | ||
| fmt, | ||
| future::Future, | ||
| iter, | ||
| sync::{ | ||
| atomic::{AtomicU32, Ordering}, | ||
| Arc, | ||
| }, | ||
| }; | ||
|
|
||
| use futures::{future, stream, StreamExt}; | ||
| use linera_base::{ | ||
|
|
@@ -279,7 +288,8 @@ impl ValidatorNode for GrpcClient { | |
| async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> { | ||
| let retry_delay = self.retry_delay; | ||
| let max_retries = self.max_retries; | ||
| let mut retry_count = 0; | ||
| // Use shared atomic counter so unfold can reset it on successful reconnection. | ||
| let retry_count = Arc::new(AtomicU32::new(0)); | ||
| let subscription_request = SubscriptionRequest { | ||
| chain_ids: chains.into_iter().map(|chain| chain.into()).collect(), | ||
| }; | ||
|
|
@@ -298,17 +308,24 @@ impl ValidatorNode for GrpcClient { | |
|
|
||
| // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling | ||
| // `client.subscribe(request)` endlessly and without delay. | ||
| let retry_count_for_unfold = retry_count.clone(); | ||
| let endlessly_retrying_notification_stream = stream::unfold((), move |()| { | ||
| let mut client = client.clone(); | ||
| let subscription_request = subscription_request.clone(); | ||
| let mut stream = stream.take(); | ||
| let retry_count = retry_count_for_unfold.clone(); | ||
| async move { | ||
| let stream = if let Some(stream) = stream.take() { | ||
| future::Either::Right(stream) | ||
| } else { | ||
| match client.subscribe(subscription_request.clone()).await { | ||
| Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))), | ||
| Ok(response) => future::Either::Right(response.into_inner()), | ||
| Ok(response) => { | ||
| // Reset retry count on successful reconnection. | ||
| retry_count.store(0, Ordering::Relaxed); | ||
| trace!("Successfully reconnected subscription stream"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it say "reconnected" now also on the first attempt?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, AFAIU the first attempt is when we initially create the stream on L299. This is just for reconnections |
||
| future::Either::Right(response.into_inner()) | ||
| } | ||
| } | ||
| }; | ||
| Some((stream, ())) | ||
|
|
@@ -328,15 +345,18 @@ impl ValidatorNode for GrpcClient { | |
| }) | ||
| .take_while(move |result| { | ||
| let Err(status) = result else { | ||
| retry_count = 0; | ||
| retry_count.store(0, Ordering::Relaxed); | ||
| return future::Either::Left(future::ready(true)); | ||
| }; | ||
|
|
||
| if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries { | ||
| let current_retry_count = retry_count.load(Ordering::Relaxed); | ||
| if !span.in_scope(|| Self::is_retryable(status)) | ||
| || current_retry_count >= max_retries | ||
| { | ||
| return future::Either::Left(future::ready(false)); | ||
| } | ||
| let delay = retry_delay.saturating_mul(retry_count); | ||
| retry_count += 1; | ||
| let delay = retry_delay.saturating_mul(current_retry_count); | ||
| retry_count.fetch_add(1, Ordering::Relaxed); | ||
| future::Either::Right(async move { | ||
| linera_base::time::timer::sleep(delay).await; | ||
| true | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't the
storebelow already take care of that?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not if it's an idle stream! And I think this was the bug. If we have a busy stream with notifications coming in all the time, then what we were doing below was probably enough. But if the stream is idle, we can reconnect even if there's no new notifications flowing through. We also need to make sure we zero the retry count in that case.