-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(xds): implement xDS subscription worker #2478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
YutaoMa
wants to merge
30
commits into
hyperium:master
Choose a base branch
from
YutaoMa:yutaoma/xds-client-worker
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,390
−242
Open
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
a8e98c0
feat(xds): implement subscription worker
YutaoMa 4cdc171
feat(xds): basic xds-client usage example
YutaoMa 4e28aa1
fix(xds): deadlock by ADS stream needing response headers
YutaoMa 5d1320b
fix(xds): reset state upon stream reconnect
YutaoMa b42de40
fix(xds): enable io-std in tokio
YutaoMa 1598012
feat(xds): address worker PR comments
YutaoMa dd62e0f
feat(xds): simplify connect loop
YutaoMa fa0b99a
feat(xds): simplify basic example with clap
YutaoMa e2e787d
fix(xds): address Clippy warnings
YutaoMa edd4047
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa ee9c17b
style(xds): unify handle_command
YutaoMa 99c18ec
address PR comments
YutaoMa 882af5b
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa 9d40c60
xds: address PR comments
YutaoMa 871a1c2
xds: separate deserialize and validate stage
YutaoMa 093d6c9
xds: add initial Requested state
YutaoMa 4b89ac5
xds: add TransportBuilder
YutaoMa 2d77394
style(xds): fmt
YutaoMa 9f09284
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa 55062f9
xds: add resource watch timer
YutaoMa b94bfa1
style(xds): fmt
YutaoMa d32187e
xds: cancellation for resource timer
YutaoMa 6c37b52
xds: fix two logic bugs
YutaoMa 1639c14
xds: replace clap with manual env var parsing
YutaoMa 2dd8dee
xds: hide config internal types
YutaoMa 4153913
xds: replace futures with tokio sync primitives
YutaoMa a15d5b8
xds: bound watch command channel
YutaoMa b6af7a5
doc(xds): update doc
YutaoMa b9fe3d7
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa 3bc819a
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,233 @@ | ||
| //! Example demonstrating xds-client usage. | ||
| //! | ||
| //! This example shows: | ||
| //! - How to implement the `Resource` trait for Envoy Listener | ||
| //! - How to create an `XdsClient` with tonic transport and prost codec | ||
| //! - How to watch for resources and handle events | ||
| //! | ||
| //! # Usage | ||
| //! | ||
| //! ```sh | ||
| //! # Basic usage | ||
| //! cargo run -p xds-client --example basic -- -l my-listener | ||
| //! | ||
| //! # Multiple listeners | ||
| //! cargo run -p xds-client --example basic -- -l listener-1 -l listener-2 | ||
| //! | ||
| //! # Custom server | ||
| //! cargo run -p xds-client --example basic -- -s http://xds.example.com:18000 -l foo | ||
| //! | ||
| //! # With TLS | ||
| //! cargo run -p xds-client --example basic -- \ | ||
| //! --ca-cert /path/to/ca.pem \ | ||
| //! --client-cert /path/to/client.pem \ | ||
| //! --client-key /path/to/client.key \ | ||
| //! -l my-listener | ||
| //! ``` | ||
|
|
||
| use bytes::Bytes; | ||
| use clap::Parser; | ||
| use envoy_types::pb::envoy::config::listener::v3::Listener as ListenerProto; | ||
| use envoy_types::pb::envoy::extensions::filters::network::http_connection_manager::v3::{ | ||
| http_connection_manager::RouteSpecifier, HttpConnectionManager, | ||
| }; | ||
| use prost::Message; | ||
| use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; | ||
|
|
||
| use xds_client::resource::TypeUrl; | ||
| use xds_client::{ | ||
| ClientConfig, Node, ProstCodec, Resource, ResourceEvent, Result as XdsResult, ServerConfig, | ||
| TokioRuntime, TonicTransport, TonicTransportBuilder, TransportBuilder, XdsClient, | ||
| }; | ||
|
|
||
| /// Example demonstrating xds-client usage. | ||
| #[derive(Parser, Debug)] | ||
| #[command(name = "basic")] | ||
| #[command(about = "xds-client example - watch Listener resources")] | ||
| struct Args { | ||
| /// URI of the xDS management server. | ||
| #[arg(short, long, default_value = "http://localhost:18000")] | ||
| server: String, | ||
|
|
||
| /// Path to PEM-encoded CA certificate for TLS (enables TLS when set). | ||
| #[arg(long)] | ||
| ca_cert: Option<String>, | ||
|
|
||
| /// Path to PEM-encoded client certificate for mTLS. | ||
| #[arg(long, requires = "ca_cert")] | ||
| client_cert: Option<String>, | ||
|
|
||
| /// Path to PEM-encoded client key for mTLS. | ||
| #[arg(long, requires = "client_cert")] | ||
| client_key: Option<String>, | ||
|
|
||
| /// Listener names to watch (pass multiple: -l foo -l bar). | ||
| #[arg(short, long = "listener", required = true)] | ||
| listeners: Vec<String>, | ||
| } | ||
|
|
||
| /// A simplified Listener resource for gRPC xDS. | ||
| /// | ||
| /// Extracts the RDS route config name from the ApiListener's HttpConnectionManager. | ||
| #[derive(Debug, Clone)] | ||
| pub struct Listener { | ||
| /// The listener name. | ||
| pub name: String, | ||
| /// The RDS route config name (from HttpConnectionManager). | ||
| pub rds_route_config_name: Option<String>, | ||
| } | ||
|
|
||
| /// Custom transport builder that configures TLS on the channel. | ||
| /// | ||
| /// This demonstrates how to implement a custom [`TransportBuilder`] when you need | ||
| /// TLS or other custom channel configuration. The default [`TonicTransportBuilder`] | ||
| /// creates plain (non-TLS) connections. | ||
| struct TlsTransportBuilder { | ||
| tls_config: ClientTlsConfig, | ||
| } | ||
|
|
||
| impl TransportBuilder for TlsTransportBuilder { | ||
| type Transport = TonicTransport; | ||
|
|
||
| async fn build(&self, server: &ServerConfig) -> XdsResult<Self::Transport> { | ||
| let channel = Channel::from_shared(server.uri.clone()) | ||
| .map_err(|e| xds_client::Error::Connection(e.to_string()))? | ||
| .tls_config(self.tls_config.clone()) | ||
| .map_err(|e| xds_client::Error::Connection(e.to_string()))? | ||
| .connect() | ||
| .await | ||
| .map_err(|e| xds_client::Error::Connection(e.to_string()))?; | ||
|
|
||
| Ok(TonicTransport::from_channel(channel)) | ||
| } | ||
| } | ||
|
|
||
| impl Resource for Listener { | ||
| type Message = ListenerProto; | ||
|
|
||
| const TYPE_URL: TypeUrl = TypeUrl::new("type.googleapis.com/envoy.config.listener.v3.Listener"); | ||
|
|
||
| fn deserialize(bytes: Bytes) -> xds_client::Result<Self::Message> { | ||
| ListenerProto::decode(bytes).map_err(Into::into) | ||
| } | ||
|
|
||
| fn name(message: &Self::Message) -> &str { | ||
| &message.name | ||
| } | ||
|
|
||
| fn validate(message: Self::Message) -> xds_client::Result<Self> { | ||
| let hcm = message | ||
| .api_listener | ||
| .and_then(|api| api.api_listener) | ||
| .and_then(|any| HttpConnectionManager::decode(Bytes::from(any.value)).ok()); | ||
|
|
||
| let rds_route_config_name = hcm.and_then(|hcm| match hcm.route_specifier { | ||
| Some(RouteSpecifier::Rds(rds)) => Some(rds.route_config_name), | ||
| _ => None, | ||
| }); | ||
|
|
||
| Ok(Self { | ||
| name: message.name, | ||
| rds_route_config_name, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| let args = Args::parse(); | ||
|
|
||
| if args.client_cert.is_some() != args.client_key.is_some() { | ||
| eprintln!("Error: --client-cert and --client-key must be provided together"); | ||
| std::process::exit(1); | ||
| } | ||
|
|
||
| println!("xds-client Example\n"); | ||
| println!("Connecting to xDS server: {}", args.server); | ||
|
|
||
| let node = Node::new("grpc", "1.0").with_id("example-node"); | ||
| let config = ClientConfig::new(node, &args.server); | ||
|
|
||
| let client = match &args.ca_cert { | ||
| Some(ca_path) => { | ||
| let ca_cert = std::fs::read_to_string(ca_path)?; | ||
| let mut tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(&ca_cert)); | ||
|
|
||
| if let (Some(cert_path), Some(key_path)) = (&args.client_cert, &args.client_key) { | ||
YutaoMa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let client_cert = std::fs::read_to_string(cert_path)?; | ||
| let client_key = std::fs::read_to_string(key_path)?; | ||
| tls = tls.identity(Identity::from_pem(client_cert, client_key)); | ||
| } | ||
|
|
||
| let tls_builder = TlsTransportBuilder { tls_config: tls }; | ||
| XdsClient::builder(config, tls_builder, ProstCodec, TokioRuntime).build() | ||
| } | ||
| None => XdsClient::builder( | ||
| config, | ||
| TonicTransportBuilder::new(), | ||
| ProstCodec, | ||
| TokioRuntime, | ||
| ) | ||
| .build(), | ||
| }; | ||
|
|
||
| println!("Starting watchers...\n"); | ||
|
|
||
| let (event_tx, mut event_rx) = | ||
| tokio::sync::mpsc::unbounded_channel::<ResourceEvent<Listener>>(); | ||
|
|
||
| // Start watchers for each listener from args | ||
| for name in &args.listeners { | ||
| println!("Watching for Listener: '{name}'"); | ||
|
|
||
| let mut watcher = client.watch::<Listener>(name); | ||
| let tx = event_tx.clone(); | ||
|
|
||
| tokio::spawn(async move { | ||
| while let Some(event) = watcher.next().await { | ||
| if tx.send(event).is_err() { | ||
YutaoMa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| eprintln!("Event channel closed, stopping watcher"); | ||
| break; | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // Drop the original sender so the loop exits when all watchers complete | ||
| drop(event_tx); | ||
|
|
||
| while let Some(event) = event_rx.recv().await { | ||
| match event { | ||
| ResourceEvent::ResourceChanged { | ||
| result: Ok(resource), | ||
| done, | ||
| } => { | ||
| println!("Listener received:"); | ||
| println!(" name: {}", resource.name); | ||
| if let Some(ref rds) = resource.rds_route_config_name { | ||
| println!(" rds_config: {rds}"); | ||
| } | ||
| println!(); | ||
|
|
||
| // In gRPC xDS, you would cascadingly subscribe to RDS, CDS, EDS, etc. | ||
| // The done signal is sent automatically when it's dropped. | ||
| drop(done); | ||
| } | ||
|
|
||
| ResourceEvent::ResourceChanged { | ||
| result: Err(error), .. | ||
| } => { | ||
| // Resource was invalidated (validation error, deleted, etc.) | ||
| println!("Resource invalidated: {error}"); | ||
| } | ||
|
|
||
| ResourceEvent::AmbientError { error, .. } => { | ||
| // Non-fatal error, continue using cached resource if available | ||
| println!("Ambient error: {error}"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("Exiting"); | ||
| Ok(()) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clap is a really heavy dep we should try to keep it out of the tree and just do the env var parsing manually?