Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a8e98c0
feat(xds): implement subscription worker
YutaoMa Jan 9, 2026
4cdc171
feat(xds): basic xds-client usage example
YutaoMa Jan 9, 2026
4e28aa1
fix(xds): deadlock by ADS stream needing response headers
YutaoMa Jan 12, 2026
5d1320b
fix(xds): reset state upon stream reconnect
YutaoMa Jan 12, 2026
b42de40
fix(xds): enable io-std in tokio
YutaoMa Jan 12, 2026
1598012
feat(xds): address worker PR comments
YutaoMa Jan 14, 2026
dd62e0f
feat(xds): simplify connect loop
YutaoMa Jan 14, 2026
fa0b99a
feat(xds): simplify basic example with clap
YutaoMa Jan 14, 2026
e2e787d
fix(xds): address Clippy warnings
YutaoMa Jan 14, 2026
edd4047
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Jan 15, 2026
ee9c17b
style(xds): unify handle_command
YutaoMa Jan 16, 2026
99c18ec
address PR comments
YutaoMa Jan 21, 2026
882af5b
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Jan 22, 2026
9d40c60
xds: address PR comments
YutaoMa Feb 2, 2026
871a1c2
xds: separate deserialize and validate stage
YutaoMa Feb 2, 2026
093d6c9
xds: add initial Requested state
YutaoMa Feb 2, 2026
4b89ac5
xds: add TransportBuilder
YutaoMa Feb 3, 2026
2d77394
style(xds): fmt
YutaoMa Feb 3, 2026
9f09284
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Feb 3, 2026
55062f9
xds: add resource watch timer
YutaoMa Feb 3, 2026
b94bfa1
style(xds): fmt
YutaoMa Feb 3, 2026
d32187e
xds: cancellation for resource timer
YutaoMa Feb 5, 2026
6c37b52
xds: fix two logic bugs
YutaoMa Feb 5, 2026
1639c14
xds: replace clap with manual env var parsing
YutaoMa Feb 9, 2026
2dd8dee
xds: hide config internal types
YutaoMa Feb 9, 2026
4153913
xds: replace futures with tokio sync primitives
YutaoMa Feb 9, 2026
a15d5b8
xds: bound watch command channel
YutaoMa Feb 9, 2026
b6af7a5
doc(xds): update doc
YutaoMa Feb 10, 2026
b9fe3d7
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Feb 10, 2026
3bc819a
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions xds-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ workspace = true
[dependencies]
bytes = "1.11.0"
thiserror = "2"
futures-channel = "0.3"
futures = "0.3"

# Optional dependencies for tonic transport
tonic = { version = "0.14", optional = true }
Expand All @@ -40,11 +40,21 @@ rt-tokio = ["dep:tokio"]
codegen-prost = ["dep:envoy-types", "dep:prost"]

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] }
tokio = { version = "1", features = [
"rt-multi-thread",
"macros",
"net",
] }
tonic = { version = "0.14", features = ["tls-ring"] }
clap = { version = "4", features = ["derive"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clap is a really heavy dep we should try to keep it out of the tree and just do the env var parsing manually?

async-stream = "0.3"
envoy-types = "0.7"
prost = "0.14"

[[example]]
name = "basic"
path = "examples/basic.rs"

[package.metadata.cargo_check_external_types]
allowed_external_types = [
# major released
Expand Down
233 changes: 233 additions & 0 deletions xds-client/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//! Example demonstrating xds-client usage.
//!
//! This example shows:
//! - How to implement the `Resource` trait for Envoy Listener
//! - How to create an `XdsClient` with tonic transport and prost codec
//! - How to watch for resources and handle events
//!
//! # Usage
//!
//! ```sh
//! # Basic usage
//! cargo run -p xds-client --example basic -- -l my-listener
//!
//! # Multiple listeners
//! cargo run -p xds-client --example basic -- -l listener-1 -l listener-2
//!
//! # Custom server
//! cargo run -p xds-client --example basic -- -s http://xds.example.com:18000 -l foo
//!
//! # With TLS
//! cargo run -p xds-client --example basic -- \
//! --ca-cert /path/to/ca.pem \
//! --client-cert /path/to/client.pem \
//! --client-key /path/to/client.key \
//! -l my-listener
//! ```

use bytes::Bytes;
use clap::Parser;
use envoy_types::pb::envoy::config::listener::v3::Listener as ListenerProto;
use envoy_types::pb::envoy::extensions::filters::network::http_connection_manager::v3::{
http_connection_manager::RouteSpecifier, HttpConnectionManager,
};
use prost::Message;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};

use xds_client::resource::TypeUrl;
use xds_client::{
ClientConfig, Node, ProstCodec, Resource, ResourceEvent, Result as XdsResult, ServerConfig,
TokioRuntime, TonicTransport, TonicTransportBuilder, TransportBuilder, XdsClient,
};

/// Example demonstrating xds-client usage.
#[derive(Parser, Debug)]
#[command(name = "basic")]
#[command(about = "xds-client example - watch Listener resources")]
struct Args {
/// URI of the xDS management server.
#[arg(short, long, default_value = "http://localhost:18000")]
server: String,

/// Path to PEM-encoded CA certificate for TLS (enables TLS when set).
#[arg(long)]
ca_cert: Option<String>,

/// Path to PEM-encoded client certificate for mTLS.
#[arg(long, requires = "ca_cert")]
client_cert: Option<String>,

/// Path to PEM-encoded client key for mTLS.
#[arg(long, requires = "client_cert")]
client_key: Option<String>,

/// Listener names to watch (pass multiple: -l foo -l bar).
#[arg(short, long = "listener", required = true)]
listeners: Vec<String>,
}

/// A simplified Listener resource for gRPC xDS.
///
/// Extracts the RDS route config name from the ApiListener's HttpConnectionManager.
#[derive(Debug, Clone)]
pub struct Listener {
/// The listener name.
pub name: String,
/// The RDS route config name (from HttpConnectionManager).
pub rds_route_config_name: Option<String>,
}

/// Custom transport builder that configures TLS on the channel.
///
/// This demonstrates how to implement a custom [`TransportBuilder`] when you need
/// TLS or other custom channel configuration. The default [`TonicTransportBuilder`]
/// creates plain (non-TLS) connections.
struct TlsTransportBuilder {
tls_config: ClientTlsConfig,
}

impl TransportBuilder for TlsTransportBuilder {
type Transport = TonicTransport;

async fn build(&self, server: &ServerConfig) -> XdsResult<Self::Transport> {
let channel = Channel::from_shared(server.uri.clone())
.map_err(|e| xds_client::Error::Connection(e.to_string()))?
.tls_config(self.tls_config.clone())
.map_err(|e| xds_client::Error::Connection(e.to_string()))?
.connect()
.await
.map_err(|e| xds_client::Error::Connection(e.to_string()))?;

Ok(TonicTransport::from_channel(channel))
}
}

impl Resource for Listener {
type Message = ListenerProto;

const TYPE_URL: TypeUrl = TypeUrl::new("type.googleapis.com/envoy.config.listener.v3.Listener");

fn deserialize(bytes: Bytes) -> xds_client::Result<Self::Message> {
ListenerProto::decode(bytes).map_err(Into::into)
}

fn name(message: &Self::Message) -> &str {
&message.name
}

fn validate(message: Self::Message) -> xds_client::Result<Self> {
let hcm = message
.api_listener
.and_then(|api| api.api_listener)
.and_then(|any| HttpConnectionManager::decode(Bytes::from(any.value)).ok());

let rds_route_config_name = hcm.and_then(|hcm| match hcm.route_specifier {
Some(RouteSpecifier::Rds(rds)) => Some(rds.route_config_name),
_ => None,
});

Ok(Self {
name: message.name,
rds_route_config_name,
})
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

if args.client_cert.is_some() != args.client_key.is_some() {
eprintln!("Error: --client-cert and --client-key must be provided together");
std::process::exit(1);
}

println!("xds-client Example\n");
println!("Connecting to xDS server: {}", args.server);

let node = Node::new("grpc", "1.0").with_id("example-node");
let config = ClientConfig::new(node, &args.server);

let client = match &args.ca_cert {
Some(ca_path) => {
let ca_cert = std::fs::read_to_string(ca_path)?;
let mut tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(&ca_cert));

if let (Some(cert_path), Some(key_path)) = (&args.client_cert, &args.client_key) {
let client_cert = std::fs::read_to_string(cert_path)?;
let client_key = std::fs::read_to_string(key_path)?;
tls = tls.identity(Identity::from_pem(client_cert, client_key));
}

let tls_builder = TlsTransportBuilder { tls_config: tls };
XdsClient::builder(config, tls_builder, ProstCodec, TokioRuntime).build()
}
None => XdsClient::builder(
config,
TonicTransportBuilder::new(),
ProstCodec,
TokioRuntime,
)
.build(),
};

println!("Starting watchers...\n");

let (event_tx, mut event_rx) =
tokio::sync::mpsc::unbounded_channel::<ResourceEvent<Listener>>();

// Start watchers for each listener from args
for name in &args.listeners {
println!("Watching for Listener: '{name}'");

let mut watcher = client.watch::<Listener>(name);
let tx = event_tx.clone();

tokio::spawn(async move {
while let Some(event) = watcher.next().await {
if tx.send(event).is_err() {
eprintln!("Event channel closed, stopping watcher");
break;
}
}
});
}

// Drop the original sender so the loop exits when all watchers complete
drop(event_tx);

while let Some(event) = event_rx.recv().await {
match event {
ResourceEvent::ResourceChanged {
result: Ok(resource),
done,
} => {
println!("Listener received:");
println!(" name: {}", resource.name);
if let Some(ref rds) = resource.rds_route_config_name {
println!(" rds_config: {rds}");
}
println!();

// In gRPC xDS, you would cascadingly subscribe to RDS, CDS, EDS, etc.
// The done signal is sent automatically when it's dropped.
drop(done);
}

ResourceEvent::ResourceChanged {
result: Err(error), ..
} => {
// Resource was invalidated (validation error, deleted, etc.)
println!("Resource invalidated: {error}");
}

ResourceEvent::AmbientError { error, .. } => {
// Non-fatal error, continue using cached resource if available
println!("Ambient error: {error}");
}
}
}

println!("Exiting");
Ok(())
}
Loading
Loading