Skip to content

Conversation

@ndr-ds
Copy link
Contributor

@ndr-ds ndr-ds commented Feb 6, 2026

Motivation

Long-lived gRPC subscription streams would silently terminate after
accumulating max_retries errors over time, even
when each individual reconnection was successful. This caused workers to
stop receiving notifications and go silent.

Proposal

Change retry_count from a local variable to a shared Arc<AtomicU32>
that can be reset from within the unfold
closure when client.subscribe() succeeds.

Previously, retry_count only reset when a successful notification was
received. Now it also resets when a
reconnection succeeds, so max_retries applies to consecutive failed
reconnection attempts rather than total errors
over the stream's lifetime.

Test Plan

  • CI
  • Manual testing with PM benchmark that was experiencing the issue

## Motivation

Long-lived gRPC subscription streams would silently terminate after
accumulating `max_retries` errors over time, even
when each individual reconnection was successful. This caused workers to
stop receiving notifications and go silent.

## Proposal

Change `retry_count` from a local variable to a shared `Arc<AtomicU32>`
that can be reset from within the `unfold`
closure when `client.subscribe()` succeeds.

Previously, `retry_count` only reset when a successful notification was
received. Now it also resets when a
reconnection succeeds, so `max_retries` applies to consecutive failed
reconnection attempts rather than total errors
over the stream's lifetime.

## Test Plan

- CI
- Manual testing with PM benchmark that was experiencing the issue
Copy link
Contributor Author

ndr-ds commented Feb 6, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

Ok(response) => {
// Reset retry count on successful reconnection.
retry_count.store(0, Ordering::Relaxed);
trace!("Successfully reconnected subscription stream");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it say "reconnected" now also on the first attempt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, AFAIU the first attempt is when we initially create the stream on L299. This is just for reconnections

Ok(response) => future::Either::Right(response.into_inner()),
Ok(response) => {
// Reset retry count on successful reconnection.
retry_count.store(0, Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the store below already take care of that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if it's an idle stream! And I think this was the bug. If we have a busy stream with notifications coming in all the time, then what we were doing below was probably enough. But if the stream is idle, we can reconnect even if there's no new notifications flowing through. We also need to make sure we zero the retry count in that case.

@ndr-ds ndr-ds added this pull request to the merge queue Feb 9, 2026
@ndr-ds ndr-ds removed this pull request from the merge queue due to a manual request Feb 9, 2026
@ndr-ds ndr-ds added this pull request to the merge queue Feb 9, 2026
Merged via the queue into main with commit df04c2c Feb 9, 2026
35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants