Skip to content

feat(xds): implement xDS subscription worker#2478

Open
YutaoMa wants to merge 23 commits intohyperium:masterfrom
YutaoMa:yutaoma/xds-client-worker
Open

feat(xds): implement xDS subscription worker#2478
YutaoMa wants to merge 23 commits intohyperium:masterfrom
YutaoMa:yutaoma/xds-client-worker

Conversation

@YutaoMa
Copy link
Collaborator

@YutaoMa YutaoMa commented Jan 12, 2026

Motivation

Ref: #2444

With #2475 transport and codec change merged, the remaining change required to get the xDS workflow work end to end is to wire them together with XdsClient through a worker loop. This PR implements that.

Solution

  1. Implement AdsWorker, a transport/runtime/codegen-agnostic event loop for managing xDS subscriptions and ADS stream.
  • The worker conceptually manages a pair of mpsc channel, where the sender is used by XdsClient to send subscription requests and the receiver is used by TransportStream to send DiscoveryRequest to xDS servers.
  • When the underlying ADS stream closes, retry with exponential backoff is supported. Configurable via ClientConfig.
  1. Implement ResourceWatcher and wire it with XdsClient so user can now subscribes to xDS resources.

Some design choice highlights:

  1. Created a DecodedResource that is a type-erased representation of xDS resource with its decoding function carried in a closure. AdsWorker sends and receives this type on channels so it can stay transport and codec generic.
  2. The ADS stream connection waits for the first subscription from the user. This is because tonic's gRPC stream ::connect() awaits for the response headers. Depending on the xDS server implementation, it may not respond back with headers until the first subscription, creating a deadlock if we await for the stream creation before sending any requests. (Btw grpc-go works around this by having send/recv in different go routines, here I kept both in the same worker loop to reduce the shared state complexity)

Testing

Created a basic.rs example to showcase the user experience. I've used it to test against a local xDS management server and successfully subscribed to multiple Listener resources.

=== xds-client Example ===

Connecting to xDS server: https://[redacted-private-server]
Connected!

Enter listener names to watch (one per line, Ctrl+C to exit):
(Use empty string for wildcard subscription)

[redacted-listener-1]
→ Watching for Listener: '[redacted-listener-1]'
✓ Listener received:
  name:        [redacted-listener-1]
  rds_config:  [redacted-route-1]

[redacted-listener-2]
→ Watching for Listener: '[redacted-listener-2]'
✓ Listener received:
  name:       [redacted-listener-2]
  rds_config:  [redacted-route-2]

✓ Listener received:
  name:        [redacted-listener-1]
  rds_config:  [redacted-route-1]

Next Steps

The current implementation completes the basic functionality end to end, but have these improvements opportunities:

  1. xds-client currently do not cache received resources. This means new watchers to a subscribed resource need to wait for the next response from xDS server. If we add in resource caching, new watchers get instant response.
  2. If we bring in a proper xDS server implementation we can run integration test in CI.
  3. Observability. We'll bring in tracing, logging and metrics support.
  4. Delta xDS. The current version implements State of the World variant of xDS only, but in some use cases delta variant is more efficient and scalable.

Additionally, these features are xDS-related but are also gRPC-specific so we have plan to implement them in a separate tonic-xds crate, using this xds-client crate:

  1. Cascadingly subscribes to RDS, CDS, and EDS resources from a top-level Listener, for gRPC routing and load balancing.
  2. xDS server connection bootstrapping, this includes TLS configuration, connecting pooling, multi-server fallback, etc.

@YutaoMa YutaoMa marked this pull request as ready for review January 12, 2026 18:43
@arjan-bal arjan-bal self-requested a review January 16, 2026 05:45
Copy link
Collaborator

@arjan-bal arjan-bal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some initial comments, still going through the PR.

@YutaoMa
Copy link
Collaborator Author

YutaoMa commented Jan 22, 2026

Addressed all prev comments in the latest commit @arjan-bal @dfawley

@@ -67,3 +69,65 @@ pub trait Resource: Sized + Send + Sync + 'static {
/// The resource name combined with the type URL uniquely identifies a resource.
fn name(&self) -> &str;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another piece of configuration required to indicate whether a resource missing in the ADS response should be interpreted as a removal. gRPC Go stores this as a boolean.

Since RDS resource names are derived from LDS resources and EDS resource names from CDS resources, a missing RDS or EDS resource in the ADS response is not considered a removal. Consequently, the watcher will not receive a ResourceError.

There's some more information in the envoy docs: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, adding that right now

@YutaoMa
Copy link
Collaborator Author

YutaoMa commented Jan 22, 2026

Note for self and reviewer: based on gRFC A88 https://github.com/grpc/proposal/blob/master/A88-xds-data-error-handling.md, the ResourceEvent needs to be changed to include the ResourceError as an variant of ResourceUpdated, and we'll need to support the server feature regarding timeout and deletion. I plan to implement these as follow ups as long as they don't block A27 usage.

Comment on lines +76 to +78
/// Wildcard subscription - receive all resources of this type.
/// In xDS protocol, this is represented by an empty resource_names list.
Wildcard,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what is the specific use case for requesting all resources? We haven't encountered a need for this in our other gRPC implementations yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I know of is

Envoy will always use wildcard subscriptions for Listener and Cluster resources

For gRPC I haven't seen anyone done it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. gRPC likely won't need this since it only subscribes to specific listener resources, whereas Envoy must intercept traffic for all services. If there are no known users of this feature, it might make sense to skip supporting it to keep the implementation simpler.

@arjan-bal arjan-bal removed their assignment Feb 2, 2026
@YutaoMa YutaoMa requested a review from arjan-bal February 3, 2026 00:50
@YutaoMa
Copy link
Collaborator Author

YutaoMa commented Feb 3, 2026

@arjan-bal re-requested your review, I've implemented the following changes in recent commits according to your comments:

1. `Resource` API change
	1. Added `ALL_RESOURCES_REQUIRED_IN_SOTW` flag
	2. Split up `decode` into `deserialize` and `validate`, specifically allow for name extraction even on failed validation
2. Added resource cache
3. Use references in `DiscoveryRequest`
4. Send resource validation error to specific watcher
5. `Config` API change
	1. Added `non_exhaustive`
	2. Added servers list config and `TransportFactory`
6. Style changes
	1. enum for Wildcard case in WatcherEntry
	2. Return `Result<(), Error>` for `run_connected`

Copy link
Collaborator

@arjan-bal arjan-bal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I’ve left one primary comment regarding the cancellation of background timers; otherwise, I just have a few nits.

We should aim to improve coverage with end-to-end tests in the future. I expect that will happen once the client is integrated into the Tower layers for Tonic compatibility.

Thanks again—this is a significant contribution!

/// let config = ClientConfig::new(node, "https://xds.example.com:443")
/// .with_resource_initial_timeout(None);
/// ```
pub fn with_resource_initial_timeout(mut self, timeout: Option<Duration>) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should with_resource_initial_timeout avoid taking an Option? If a caller doesn't want to set a timeout, they can just forgo calling the method entirely.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it could still be useful to offer a way to clear timeout if set.

Comment on lines 952 to 958
self.runtime.spawn(async move {
runtime.sleep(timeout).await;
let _ = command_tx.unbounded_send(WorkerCommand::ResourceTimerExpired {
type_url: type_url_owned,
name,
});
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ensure the spawned task is cancelled upon resource receipt so we don't leak any futures.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a oneshot channel based cancellation mechanism.

@arjan-bal arjan-bal assigned YutaoMa and unassigned arjan-bal Feb 5, 2026
Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks great, I don't have much feedback on the specific xDS stuff but we need to switch over to using the tokio crates for the majority of things. Major reason being is they are much better maintained and actually have support. As well as having much better correctness.

Additionally, looking at some of the code I wonder if we could wire up tests with https://github.com/tokio-rs/turmoil/ to test the retry behavior etc.

"net",
] }
tonic = { version = "0.14", features = ["tls-ring"] }
clap = { version = "4", features = ["derive"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clap is a really heavy dep we should try to keep it out of the tree and just do the env var parsing manually?

#[non_exhaustive]
pub struct ServerConfig {
/// URI of the management server (e.g., "https://xds.example.com:443").
pub uri: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we have non_exhaustive set changing the field type (which we may want to do in the future) will be breaking. I suggest that we keep String as an impl detail and expose it as a &str which allows us to move to say an Arc in the future if we want. I would apply this same rule to all public facing types (if its not public facing prob best to make it pub(crate)).

@@ -1,74 +1,160 @@
//! Client interface through which the user can watch and receive updates for xDS resources.

use futures::channel::mpsc;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use tokio here rather than the futures channel

/// This spawns a background task that manages the ADS stream.
/// The task runs until all `XdsClient` handles are dropped.
pub fn build(self) -> XdsClient {
let (command_tx, command_rx) = mpsc::unbounded();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to always avoid an unbounded queue? I know feedback loops can get hard here but I think there is always a way to set an upper bound.

let watcher_id = WatcherId::new();
let (event_tx, event_rx) = mpsc::channel(WATCHER_CHANNEL_BUFFER_SIZE);

let decoder: DecoderFn = Box::new(|bytes| match crate::resource::decode::<T>(bytes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the advantage of doing a boxdn Fn over an actual trait and trait object?

use std::sync::Arc;

use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to use tokio_stream


use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, SinkExt, StreamExt};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley and I have discussed this but I would like us to also avoid Sink at all costs right now. Its not a good abstraction for many reasons. We can discuss aternative approaches on tuesday or I can break out some time to meet with yall on it.

/// Returns `Err` if an error occurred and the worker should reconnect.
async fn run_connected<S: TransportStream>(&mut self, mut stream: S) -> Result<()> {
loop {
futures::select! {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to use tokio::select! instead of the futures crates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants