Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, fmt, future::Future, iter};
use std::{
collections::BTreeSet,
fmt,
future::Future,
iter,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};

use futures::{future, stream, StreamExt};
use linera_base::{
Expand Down Expand Up @@ -279,7 +288,8 @@ impl ValidatorNode for GrpcClient {
async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
let retry_delay = self.retry_delay;
let max_retries = self.max_retries;
let mut retry_count = 0;
// Use shared atomic counter so unfold can reset it on successful reconnection.
let retry_count = Arc::new(AtomicU32::new(0));
let subscription_request = SubscriptionRequest {
chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
};
Expand All @@ -298,17 +308,24 @@ impl ValidatorNode for GrpcClient {

// A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
// `client.subscribe(request)` endlessly and without delay.
let retry_count_for_unfold = retry_count.clone();
let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
let mut client = client.clone();
let subscription_request = subscription_request.clone();
let mut stream = stream.take();
let retry_count = retry_count_for_unfold.clone();
async move {
let stream = if let Some(stream) = stream.take() {
future::Either::Right(stream)
} else {
match client.subscribe(subscription_request.clone()).await {
Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
Ok(response) => future::Either::Right(response.into_inner()),
Ok(response) => {
// Reset retry count on successful reconnection.
retry_count.store(0, Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the store below already take care of that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if it's an idle stream! And I think this was the bug. If we have a busy stream with notifications coming in all the time, then what we were doing below was probably enough. But if the stream is idle, we can reconnect even if there's no new notifications flowing through. We also need to make sure we zero the retry count in that case.

trace!("Successfully reconnected subscription stream");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it say "reconnected" now also on the first attempt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, AFAIU the first attempt is when we initially create the stream on L299. This is just for reconnections

future::Either::Right(response.into_inner())
}
}
};
Some((stream, ()))
Expand All @@ -328,15 +345,18 @@ impl ValidatorNode for GrpcClient {
})
.take_while(move |result| {
let Err(status) = result else {
retry_count = 0;
retry_count.store(0, Ordering::Relaxed);
return future::Either::Left(future::ready(true));
};

if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
let current_retry_count = retry_count.load(Ordering::Relaxed);
if !span.in_scope(|| Self::is_retryable(status))
|| current_retry_count >= max_retries
{
return future::Either::Left(future::ready(false));
}
let delay = retry_delay.saturating_mul(retry_count);
retry_count += 1;
let delay = retry_delay.saturating_mul(current_retry_count);
retry_count.fetch_add(1, Ordering::Relaxed);
future::Either::Right(async move {
linera_base::time::timer::sleep(delay).await;
true
Expand Down
Loading