-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(xds): implement xDS subscription worker #2478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
YutaoMa
wants to merge
30
commits into
hyperium:master
Choose a base branch
from
YutaoMa:yutaoma/xds-client-worker
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
a8e98c0
feat(xds): implement subscription worker
YutaoMa 4cdc171
feat(xds): basic xds-client usage example
YutaoMa 4e28aa1
fix(xds): deadlock by ADS stream needing response headers
YutaoMa 5d1320b
fix(xds): reset state upon stream reconnect
YutaoMa b42de40
fix(xds): enable io-std in tokio
YutaoMa 1598012
feat(xds): address worker PR comments
YutaoMa dd62e0f
feat(xds): simplify connect loop
YutaoMa fa0b99a
feat(xds): simplify basic example with clap
YutaoMa e2e787d
fix(xds): address Clippy warnings
YutaoMa edd4047
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa ee9c17b
style(xds): unify handle_command
YutaoMa 99c18ec
address PR comments
YutaoMa 882af5b
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa 9d40c60
xds: address PR comments
YutaoMa 871a1c2
xds: separate deserialize and validate stage
YutaoMa 093d6c9
xds: add initial Requested state
YutaoMa 4b89ac5
xds: add TransportBuilder
YutaoMa 2d77394
style(xds): fmt
YutaoMa 9f09284
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa 55062f9
xds: add resource watch timer
YutaoMa b94bfa1
style(xds): fmt
YutaoMa d32187e
xds: cancellation for resource timer
YutaoMa 6c37b52
xds: fix two logic bugs
YutaoMa 1639c14
xds: replace clap with manual env var parsing
YutaoMa 2dd8dee
xds: hide config internal types
YutaoMa 4153913
xds: replace futures with tokio sync primitives
YutaoMa a15d5b8
xds: bound watch command channel
YutaoMa b6af7a5
doc(xds): update doc
YutaoMa b9fe3d7
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa 3bc819a
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| //! Example demonstrating xds-client usage. | ||
| //! | ||
| //! This example shows: | ||
| //! - How to implement the `Resource` trait for Envoy Listener | ||
| //! - How to create an `XdsClient` with tonic transport and prost codec | ||
| //! - How to watch for resources and handle events | ||
| //! | ||
| //! # Usage | ||
| //! | ||
| //! ```sh | ||
| //! # Basic usage | ||
| //! cargo run -p xds-client --example basic -- -l my-listener | ||
| //! | ||
| //! # Multiple listeners | ||
| //! cargo run -p xds-client --example basic -- -l listener-1 -l listener-2 | ||
| //! | ||
| //! # Custom server | ||
| //! cargo run -p xds-client --example basic -- -s http://xds.example.com:18000 -l foo | ||
| //! | ||
| //! # With TLS | ||
| //! cargo run -p xds-client --example basic -- \ | ||
| //! --ca-cert /path/to/ca.pem \ | ||
| //! --client-cert /path/to/client.pem \ | ||
| //! --client-key /path/to/client.key \ | ||
| //! -l my-listener | ||
| //! ``` | ||
|
|
||
| use bytes::Bytes; | ||
| use clap::Parser; | ||
| use envoy_types::pb::envoy::config::listener::v3::Listener as ListenerProto; | ||
| use envoy_types::pb::envoy::extensions::filters::network::http_connection_manager::v3::{ | ||
| http_connection_manager::RouteSpecifier, HttpConnectionManager, | ||
| }; | ||
| use prost::Message; | ||
| use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; | ||
|
|
||
| use xds_client::resource::TypeUrl; | ||
| use xds_client::{ | ||
| ClientConfig, ProstCodec, Resource, ResourceEvent, TokioRuntime, TonicTransport, XdsClient, | ||
| }; | ||
|
|
||
| /// Example demonstrating xds-client usage. | ||
| #[derive(Parser, Debug)] | ||
| #[command(name = "basic")] | ||
| #[command(about = "xds-client example - watch Listener resources")] | ||
| struct Args { | ||
| /// URI of the xDS management server. | ||
| #[arg(short, long, default_value = "http://localhost:18000")] | ||
| server: String, | ||
|
|
||
| /// Path to CA certificate for TLS (enables TLS when set). | ||
YutaoMa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| #[arg(long)] | ||
| ca_cert: Option<String>, | ||
|
|
||
| /// Path to client certificate for mTLS. | ||
| #[arg(long, requires = "ca_cert")] | ||
| client_cert: Option<String>, | ||
|
|
||
| /// Path to client key for mTLS. | ||
| #[arg(long, requires = "client_cert")] | ||
| client_key: Option<String>, | ||
|
|
||
| /// Listener names to watch (pass multiple: -l foo -l bar). | ||
| #[arg(short, long = "listener", required = true)] | ||
| listeners: Vec<String>, | ||
| } | ||
|
|
||
| // ============================================================================= | ||
|
|
||
| /// A simplified Listener resource for gRPC xDS. | ||
| /// | ||
| /// Extracts the RDS route config name from the ApiListener's HttpConnectionManager. | ||
| #[derive(Debug, Clone)] | ||
| pub struct Listener { | ||
| /// The listener name. | ||
| pub name: String, | ||
| /// The RDS route config name (from HttpConnectionManager). | ||
| pub rds_route_config_name: Option<String>, | ||
| } | ||
|
|
||
| impl Resource for Listener { | ||
| const TYPE_URL: TypeUrl = TypeUrl::new("type.googleapis.com/envoy.config.listener.v3.Listener"); | ||
|
|
||
| fn decode(bytes: Bytes) -> xds_client::Result<Self> { | ||
arjan-bal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let proto = ListenerProto::decode(bytes)?; | ||
|
|
||
| let hcm = proto | ||
| .api_listener | ||
| .and_then(|api| api.api_listener) | ||
| .and_then(|any| HttpConnectionManager::decode(Bytes::from(any.value)).ok()); | ||
|
|
||
| let rds_route_config_name = hcm.and_then(|hcm| match hcm.route_specifier { | ||
| Some(RouteSpecifier::Rds(rds)) => Some(rds.route_config_name), | ||
| _ => None, | ||
| }); | ||
|
|
||
| Ok(Self { | ||
| name: proto.name, | ||
| rds_route_config_name, | ||
| }) | ||
| } | ||
|
|
||
| fn name(&self) -> &str { | ||
| &self.name | ||
| } | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| let args = Args::parse(); | ||
|
|
||
| println!("=== xds-client Example ===\n"); | ||
| println!("Connecting to xDS server: {}", args.server); | ||
|
|
||
| let config = ClientConfig::with_node_id("example-node").user_agent("grpc"); | ||
|
|
||
| let transport = match &args.ca_cert { | ||
| Some(ca_path) => { | ||
| let ca_cert = std::fs::read_to_string(ca_path)?; | ||
| let mut tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(&ca_cert)); | ||
|
|
||
| if let (Some(cert_path), Some(key_path)) = (&args.client_cert, &args.client_key) { | ||
YutaoMa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let client_cert = std::fs::read_to_string(cert_path)?; | ||
| let client_key = std::fs::read_to_string(key_path)?; | ||
| tls = tls.identity(Identity::from_pem(client_cert, client_key)); | ||
| } | ||
|
|
||
| let channel = Channel::from_shared(args.server.clone())? | ||
| .tls_config(tls)? | ||
| .connect() | ||
| .await?; | ||
|
|
||
| TonicTransport::from_channel(channel) | ||
| } | ||
| None => TonicTransport::connect(&args.server).await?, | ||
| }; | ||
|
|
||
| println!("Connected!\n"); | ||
|
|
||
| let client = XdsClient::builder(config, transport, ProstCodec, TokioRuntime).build(); | ||
|
|
||
| let (event_tx, mut event_rx) = | ||
| tokio::sync::mpsc::unbounded_channel::<ResourceEvent<Listener>>(); | ||
|
|
||
| // Start watchers for each listener from args | ||
| for name in &args.listeners { | ||
| println!("→ Watching for Listener: '{name}'"); | ||
|
|
||
| let mut watcher = client.watch::<Listener>(name); | ||
| let tx = event_tx.clone(); | ||
|
|
||
| tokio::spawn(async move { | ||
| while let Some(event) = watcher.next().await { | ||
| if tx.send(event).is_err() { | ||
YutaoMa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| break; | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // Drop the original sender so the loop exits when all watchers complete | ||
| drop(event_tx); | ||
|
|
||
| while let Some(event) = event_rx.recv().await { | ||
| match event { | ||
| ResourceEvent::ResourceChanged { resource, mut done } => { | ||
| println!("✓ Listener received:"); | ||
YutaoMa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| println!(" name: {}", resource.name()); | ||
| if let Some(ref rds) = resource.rds_route_config_name { | ||
| println!(" rds_config: {rds}"); | ||
| } | ||
| println!(); | ||
|
|
||
| // In gRPC xDS, you would cascadingly subscribe to RDS, CDS, EDS, etc. | ||
| // before completing the done signal. | ||
| done.complete(); | ||
YutaoMa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| ResourceEvent::ResourceError { error, .. } => { | ||
| println!("✗ Resource error: {error}"); | ||
| } | ||
|
|
||
| ResourceEvent::AmbientError { error, .. } => { | ||
| println!("⚠ Connection error: {error}"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("Exiting"); | ||
| Ok(()) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,110 @@ | ||
| //! Configuration for the xDS client. | ||
|
|
||
| use std::time::Duration; | ||
|
|
||
| use crate::message::Node; | ||
|
|
||
| /// Configuration for the xDS client. | ||
| #[derive(Debug, Clone)] | ||
| pub struct ClientConfig { | ||
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // TODO: Add fields as needed | ||
| /// Node identification sent to the xDS server. | ||
| /// | ||
| /// Default: None. | ||
| pub node: Option<Node>, | ||
YutaoMa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// Initial backoff duration for reconnection attempts. | ||
| /// | ||
| /// Default: 1 second. | ||
| pub initial_backoff: Duration, | ||
|
|
||
| /// Maximum backoff duration for reconnection attempts. | ||
| /// | ||
| /// Default: 30 seconds. | ||
| pub max_backoff: Duration, | ||
|
|
||
| /// Multiplier for exponential backoff. | ||
| /// | ||
| /// After each failed connection attempt, the backoff duration is multiplied | ||
| /// by this value, up to `max_backoff`. | ||
| /// | ||
| /// Default: 2.0. | ||
| pub backoff_multiplier: f64, | ||
YutaoMa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| impl Default for ClientConfig { | ||
| fn default() -> Self { | ||
| Self { | ||
| node: None, | ||
| initial_backoff: Duration::from_secs(1), | ||
| max_backoff: Duration::from_secs(30), | ||
| backoff_multiplier: 2.0, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ClientConfig { | ||
| /// Create a new configuration with the given node identification. | ||
| pub fn new(node: Node) -> Self { | ||
| Self { | ||
| node: Some(node), | ||
| ..Default::default() | ||
| } | ||
| } | ||
|
|
||
| /// Create a new configuration with a node ID. | ||
| pub fn with_node_id(id: impl Into<String>) -> Self { | ||
| Self::new(Node { | ||
| id: id.into(), | ||
| ..Default::default() | ||
| }) | ||
| } | ||
|
|
||
| /// Set the node identification. | ||
| pub fn node(mut self, node: Node) -> Self { | ||
| self.node = Some(node); | ||
| self | ||
| } | ||
|
|
||
| /// Set the user agent name. | ||
| /// | ||
| /// This identifies the client to the xDS server. Some servers require | ||
| /// specific values (e.g., "grpc", "envoy"). | ||
| /// | ||
| /// # Example | ||
| /// | ||
| /// ``` | ||
| /// use xds_client::ClientConfig; | ||
| /// | ||
| /// let config = ClientConfig::with_node_id("my-node") | ||
| /// .user_agent("grpc"); | ||
| /// ``` | ||
| pub fn user_agent(mut self, name: impl Into<String>) -> Self { | ||
| if let Some(ref mut node) = self.node { | ||
| node.user_agent_name = name.into(); | ||
| } else { | ||
| self.node = Some(Node { | ||
| user_agent_name: name.into(), | ||
| ..Default::default() | ||
| }); | ||
| } | ||
| self | ||
| } | ||
YutaoMa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// Set the initial backoff duration. | ||
| pub fn initial_backoff(mut self, duration: Duration) -> Self { | ||
| self.initial_backoff = duration; | ||
| self | ||
| } | ||
|
|
||
| /// Set the maximum backoff duration. | ||
| pub fn max_backoff(mut self, duration: Duration) -> Self { | ||
| self.max_backoff = duration; | ||
| self | ||
| } | ||
|
|
||
| /// Set the backoff multiplier. | ||
| pub fn backoff_multiplier(mut self, multiplier: f64) -> Self { | ||
| self.backoff_multiplier = multiplier; | ||
| self | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clap is a really heavy dep we should try to keep it out of the tree and just do the env var parsing manually?