feat(xds): implement xDS subscription worker#2478
feat(xds): implement xDS subscription worker#2478YutaoMa wants to merge 23 commits intohyperium:masterfrom
Conversation
arjan-bal
left a comment
There was a problem hiding this comment.
Left some initial comments, still going through the PR.
|
Addressed all prev comments in the latest commit @arjan-bal @dfawley |
xds-client/src/resource/mod.rs
Outdated
| @@ -67,3 +69,65 @@ pub trait Resource: Sized + Send + Sync + 'static { | |||
| /// The resource name combined with the type URL uniquely identifies a resource. | |||
| fn name(&self) -> &str; | |||
There was a problem hiding this comment.
There is another piece of configuration required to indicate whether a resource missing in the ADS response should be interpreted as a removal. gRPC Go stores this as a boolean.
Since RDS resource names are derived from LDS resources and EDS resource names from CDS resources, a missing RDS or EDS resource in the ADS response is not considered a removal. Consequently, the watcher will not receive a ResourceError.
There's some more information in the envoy docs: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist
There was a problem hiding this comment.
Noted, adding that right now
|
Note for self and reviewer: based on gRFC A88 https://github.com/grpc/proposal/blob/master/A88-xds-data-error-handling.md, the |
| /// Wildcard subscription - receive all resources of this type. | ||
| /// In xDS protocol, this is represented by an empty resource_names list. | ||
| Wildcard, |
There was a problem hiding this comment.
Out of curiosity, what is the specific use case for requesting all resources? We haven't encountered a need for this in our other gRPC implementations yet.
There was a problem hiding this comment.
What I know of is
Envoy will always use wildcard subscriptions for Listener and Cluster resources
For gRPC I haven't seen anyone done it.
There was a problem hiding this comment.
I see. gRPC likely won't need this since it only subscribes to specific listener resources, whereas Envoy must intercept traffic for all services. If there are no known users of this feature, it might make sense to skip supporting it to keep the implementation simpler.
|
@arjan-bal re-requested your review, I've implemented the following changes in recent commits according to your comments: |
arjan-bal
left a comment
There was a problem hiding this comment.
LGTM! I’ve left one primary comment regarding the cancellation of background timers; otherwise, I just have a few nits.
We should aim to improve coverage with end-to-end tests in the future. I expect that will happen once the client is integrated into the Tower layers for Tonic compatibility.
Thanks again—this is a significant contribution!
| /// let config = ClientConfig::new(node, "https://xds.example.com:443") | ||
| /// .with_resource_initial_timeout(None); | ||
| /// ``` | ||
| pub fn with_resource_initial_timeout(mut self, timeout: Option<Duration>) -> Self { |
There was a problem hiding this comment.
nit: Should with_resource_initial_timeout avoid taking an Option? If a caller doesn't want to set a timeout, they can just forgo calling the method entirely.
There was a problem hiding this comment.
I suppose it could still be useful to offer a way to clear timeout if set.
| self.runtime.spawn(async move { | ||
| runtime.sleep(timeout).await; | ||
| let _ = command_tx.unbounded_send(WorkerCommand::ResourceTimerExpired { | ||
| type_url: type_url_owned, | ||
| name, | ||
| }); | ||
| }); |
There was a problem hiding this comment.
We should ensure the spawned task is cancelled upon resource receipt so we don't leak any futures.
There was a problem hiding this comment.
Added a oneshot channel based cancellation mechanism.
LucioFranco
left a comment
There was a problem hiding this comment.
Overall looks great, I don't have much feedback on the specific xDS stuff but we need to switch over to using the tokio crates for the majority of things. Major reason being is they are much better maintained and actually have support. As well as having much better correctness.
Additionally, looking at some of the code I wonder if we could wire up tests with https://github.com/tokio-rs/turmoil/ to test the retry behavior etc.
| "net", | ||
| ] } | ||
| tonic = { version = "0.14", features = ["tls-ring"] } | ||
| clap = { version = "4", features = ["derive"] } |
There was a problem hiding this comment.
clap is a really heavy dep we should try to keep it out of the tree and just do the env var parsing manually?
| #[non_exhaustive] | ||
| pub struct ServerConfig { | ||
| /// URI of the management server (e.g., "https://xds.example.com:443"). | ||
| pub uri: String, |
There was a problem hiding this comment.
Even though we have non_exhaustive set changing the field type (which we may want to do in the future) will be breaking. I suggest that we keep String as an impl detail and expose it as a &str which allows us to move to say an Arc in the future if we want. I would apply this same rule to all public facing types (if its not public facing prob best to make it pub(crate)).
| @@ -1,74 +1,160 @@ | |||
| //! Client interface through which the user can watch and receive updates for xDS resources. | |||
|
|
|||
| use futures::channel::mpsc; | |||
There was a problem hiding this comment.
We need to use tokio here rather than the futures channel
| /// This spawns a background task that manages the ADS stream. | ||
| /// The task runs until all `XdsClient` handles are dropped. | ||
| pub fn build(self) -> XdsClient { | ||
| let (command_tx, command_rx) = mpsc::unbounded(); |
There was a problem hiding this comment.
I think we want to always avoid an unbounded queue? I know feedback loops can get hard here but I think there is always a way to set an upper bound.
| let watcher_id = WatcherId::new(); | ||
| let (event_tx, event_rx) = mpsc::channel(WATCHER_CHANNEL_BUFFER_SIZE); | ||
|
|
||
| let decoder: DecoderFn = Box::new(|bytes| match crate::resource::decode::<T>(bytes) { |
There was a problem hiding this comment.
Whats the advantage of doing a boxdn Fn over an actual trait and trait object?
| use std::sync::Arc; | ||
|
|
||
| use futures::channel::{mpsc, oneshot}; | ||
| use futures::StreamExt; |
|
|
||
| use bytes::Bytes; | ||
| use futures::channel::{mpsc, oneshot}; | ||
| use futures::{FutureExt, SinkExt, StreamExt}; |
There was a problem hiding this comment.
@dfawley and I have discussed this but I would like us to also avoid Sink at all costs right now. Its not a good abstraction for many reasons. We can discuss aternative approaches on tuesday or I can break out some time to meet with yall on it.
| /// Returns `Err` if an error occurred and the worker should reconnect. | ||
| async fn run_connected<S: TransportStream>(&mut self, mut stream: S) -> Result<()> { | ||
| loop { | ||
| futures::select! { |
There was a problem hiding this comment.
This needs to use tokio::select! instead of the futures crates
Motivation
Ref: #2444
With #2475 transport and codec change merged, the remaining change required to get the xDS workflow work end to end is to wire them together with
XdsClientthrough a worker loop. This PR implements that.Solution
AdsWorker, a transport/runtime/codegen-agnostic event loop for managing xDS subscriptions and ADS stream.XdsClientto send subscription requests and the receiver is used byTransportStreamto sendDiscoveryRequestto xDS servers.ClientConfig.ResourceWatcherand wire it withXdsClientso user can now subscribes to xDS resources.Some design choice highlights:
DecodedResourcethat is a type-erased representation of xDS resource with its decoding function carried in a closure. AdsWorker sends and receives this type on channels so it can stay transport and codec generic.tonic's gRPC stream::connect()awaits for the response headers. Depending on the xDS server implementation, it may not respond back with headers until the first subscription, creating a deadlock if we await for the stream creation before sending any requests. (Btwgrpc-goworks around this by having send/recv in different go routines, here I kept both in the same worker loop to reduce the shared state complexity)Testing
Created a
basic.rsexample to showcase the user experience. I've used it to test against a local xDS management server and successfully subscribed to multiple Listener resources.Next Steps
The current implementation completes the basic functionality end to end, but have these improvements opportunities:
xds-clientcurrently do not cache received resources. This means new watchers to a subscribed resource need to wait for the next response from xDS server. If we add in resource caching, new watchers get instant response.Additionally, these features are xDS-related but are also gRPC-specific so we have plan to implement them in a separate
tonic-xdscrate, using thisxds-clientcrate: