Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a8e98c0
feat(xds): implement subscription worker
YutaoMa Jan 9, 2026
4cdc171
feat(xds): basic xds-client usage example
YutaoMa Jan 9, 2026
4e28aa1
fix(xds): deadlock by ADS stream needing response headers
YutaoMa Jan 12, 2026
5d1320b
fix(xds): reset state upon stream reconnect
YutaoMa Jan 12, 2026
b42de40
fix(xds): enable io-std in tokio
YutaoMa Jan 12, 2026
1598012
feat(xds): address worker PR comments
YutaoMa Jan 14, 2026
dd62e0f
feat(xds): simplify connect loop
YutaoMa Jan 14, 2026
fa0b99a
feat(xds): simplify basic example with clap
YutaoMa Jan 14, 2026
e2e787d
fix(xds): address Clippy warnings
YutaoMa Jan 14, 2026
edd4047
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Jan 15, 2026
ee9c17b
style(xds): unify handle_command
YutaoMa Jan 16, 2026
99c18ec
address PR comments
YutaoMa Jan 21, 2026
882af5b
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Jan 22, 2026
9d40c60
xds: address PR comments
YutaoMa Feb 2, 2026
871a1c2
xds: separate deserialize and validate stage
YutaoMa Feb 2, 2026
093d6c9
xds: add initial Requested state
YutaoMa Feb 2, 2026
4b89ac5
xds: add TransportBuilder
YutaoMa Feb 3, 2026
2d77394
style(xds): fmt
YutaoMa Feb 3, 2026
9f09284
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Feb 3, 2026
55062f9
xds: add resource watch timer
YutaoMa Feb 3, 2026
b94bfa1
style(xds): fmt
YutaoMa Feb 3, 2026
d32187e
xds: cancellation for resource timer
YutaoMa Feb 5, 2026
6c37b52
xds: fix two logic bugs
YutaoMa Feb 5, 2026
1639c14
xds: replace clap with manual env var parsing
YutaoMa Feb 9, 2026
2dd8dee
xds: hide config internal types
YutaoMa Feb 9, 2026
4153913
xds: replace futures with tokio sync primitives
YutaoMa Feb 9, 2026
a15d5b8
xds: bound watch command channel
YutaoMa Feb 9, 2026
b6af7a5
doc(xds): update doc
YutaoMa Feb 10, 2026
b9fe3d7
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Feb 10, 2026
3bc819a
Merge branch 'master' into yutaoma/xds-client-worker
YutaoMa Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions xds-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ workspace = true
[dependencies]
bytes = "1.11.0"
thiserror = "2"
futures-channel = "0.3"
futures = "0.3"
uuid = { version = "1", features = ["v4"] }

# Optional dependencies for tonic transport
tonic = { version = "0.14", optional = true }
Expand All @@ -40,11 +41,21 @@ rt-tokio = ["dep:tokio"]
codegen-prost = ["dep:envoy-types", "dep:prost"]

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] }
tokio = { version = "1", features = [
"rt-multi-thread",
"macros",
"net",
] }
tonic = { version = "0.14", features = ["tls-ring"] }
clap = { version = "4", features = ["derive"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clap is a really heavy dep we should try to keep it out of the tree and just do the env var parsing manually?

async-stream = "0.3"
envoy-types = "0.7"
prost = "0.14"

[[example]]
name = "basic"
path = "examples/basic.rs"

[package.metadata.cargo_check_external_types]
allowed_external_types = [
# major released
Expand Down
191 changes: 191 additions & 0 deletions xds-client/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//! Example demonstrating xds-client usage.
//!
//! This example shows:
//! - How to implement the `Resource` trait for Envoy Listener
//! - How to create an `XdsClient` with tonic transport and prost codec
//! - How to watch for resources and handle events
//!
//! # Usage
//!
//! ```sh
//! # Basic usage
//! cargo run -p xds-client --example basic -- -l my-listener
//!
//! # Multiple listeners
//! cargo run -p xds-client --example basic -- -l listener-1 -l listener-2
//!
//! # Custom server
//! cargo run -p xds-client --example basic -- -s http://xds.example.com:18000 -l foo
//!
//! # With TLS
//! cargo run -p xds-client --example basic -- \
//! --ca-cert /path/to/ca.pem \
//! --client-cert /path/to/client.pem \
//! --client-key /path/to/client.key \
//! -l my-listener
//! ```

use bytes::Bytes;
use clap::Parser;
use envoy_types::pb::envoy::config::listener::v3::Listener as ListenerProto;
use envoy_types::pb::envoy::extensions::filters::network::http_connection_manager::v3::{
http_connection_manager::RouteSpecifier, HttpConnectionManager,
};
use prost::Message;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};

use xds_client::resource::TypeUrl;
use xds_client::{
ClientConfig, ProstCodec, Resource, ResourceEvent, TokioRuntime, TonicTransport, XdsClient,
};

/// Example demonstrating xds-client usage.
#[derive(Parser, Debug)]
#[command(name = "basic")]
#[command(about = "xds-client example - watch Listener resources")]
struct Args {
/// URI of the xDS management server.
#[arg(short, long, default_value = "http://localhost:18000")]
server: String,

/// Path to CA certificate for TLS (enables TLS when set).
#[arg(long)]
ca_cert: Option<String>,

/// Path to client certificate for mTLS.
#[arg(long, requires = "ca_cert")]
client_cert: Option<String>,

/// Path to client key for mTLS.
#[arg(long, requires = "client_cert")]
client_key: Option<String>,

/// Listener names to watch (pass multiple: -l foo -l bar).
#[arg(short, long = "listener", required = true)]
listeners: Vec<String>,
}

// =============================================================================

/// A simplified Listener resource for gRPC xDS.
///
/// Extracts the RDS route config name from the ApiListener's HttpConnectionManager.
#[derive(Debug, Clone)]
pub struct Listener {
/// The listener name.
pub name: String,
/// The RDS route config name (from HttpConnectionManager).
pub rds_route_config_name: Option<String>,
}

impl Resource for Listener {
const TYPE_URL: TypeUrl = TypeUrl::new("type.googleapis.com/envoy.config.listener.v3.Listener");

fn decode(bytes: Bytes) -> xds_client::Result<Self> {
let proto = ListenerProto::decode(bytes)?;

let hcm = proto
.api_listener
.and_then(|api| api.api_listener)
.and_then(|any| HttpConnectionManager::decode(Bytes::from(any.value)).ok());

let rds_route_config_name = hcm.and_then(|hcm| match hcm.route_specifier {
Some(RouteSpecifier::Rds(rds)) => Some(rds.route_config_name),
_ => None,
});

Ok(Self {
name: proto.name,
rds_route_config_name,
})
}

fn name(&self) -> &str {
&self.name
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

println!("=== xds-client Example ===\n");
println!("Connecting to xDS server: {}", args.server);

let config = ClientConfig::with_node_id("example-node").user_agent("grpc");

let transport = match &args.ca_cert {
Some(ca_path) => {
let ca_cert = std::fs::read_to_string(ca_path)?;
let mut tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(&ca_cert));

if let (Some(cert_path), Some(key_path)) = (&args.client_cert, &args.client_key) {
let client_cert = std::fs::read_to_string(cert_path)?;
let client_key = std::fs::read_to_string(key_path)?;
tls = tls.identity(Identity::from_pem(client_cert, client_key));
}

let channel = Channel::from_shared(args.server.clone())?
.tls_config(tls)?
.connect()
.await?;

TonicTransport::from_channel(channel)
}
None => TonicTransport::connect(&args.server).await?,
};

println!("Connected!\n");

let client = XdsClient::builder(config, transport, ProstCodec, TokioRuntime).build();

let (event_tx, mut event_rx) =
tokio::sync::mpsc::unbounded_channel::<ResourceEvent<Listener>>();

// Start watchers for each listener from args
for name in &args.listeners {
println!("→ Watching for Listener: '{name}'");

let mut watcher = client.watch::<Listener>(name);
let tx = event_tx.clone();

tokio::spawn(async move {
while let Some(event) = watcher.next().await {
if tx.send(event).is_err() {
break;
}
}
});
}

// Drop the original sender so the loop exits when all watchers complete
drop(event_tx);

while let Some(event) = event_rx.recv().await {
match event {
ResourceEvent::ResourceChanged { resource, mut done } => {
println!("✓ Listener received:");
println!(" name: {}", resource.name());
if let Some(ref rds) = resource.rds_route_config_name {
println!(" rds_config: {rds}");
}
println!();

// In gRPC xDS, you would cascadingly subscribe to RDS, CDS, EDS, etc.
// before completing the done signal.
done.complete();
}

ResourceEvent::ResourceError { error, .. } => {
println!("✗ Resource error: {error}");
}

ResourceEvent::AmbientError { error, .. } => {
println!("⚠ Connection error: {error}");
}
}
}

println!("Exiting");
Ok(())
}
105 changes: 104 additions & 1 deletion xds-client/src/client/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,110 @@
//! Configuration for the xDS client.

use std::time::Duration;

use crate::message::Node;

/// Configuration for the xDS client.
#[derive(Debug, Clone)]
pub struct ClientConfig {
// TODO: Add fields as needed
/// Node identification sent to the xDS server.
///
/// Default: None.
pub node: Option<Node>,

/// Initial backoff duration for reconnection attempts.
///
/// Default: 1 second.
pub initial_backoff: Duration,

/// Maximum backoff duration for reconnection attempts.
///
/// Default: 30 seconds.
pub max_backoff: Duration,

/// Multiplier for exponential backoff.
///
/// After each failed connection attempt, the backoff duration is multiplied
/// by this value, up to `max_backoff`.
///
/// Default: 2.0.
pub backoff_multiplier: f64,
}

impl Default for ClientConfig {
fn default() -> Self {
Self {
node: None,
initial_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(30),
backoff_multiplier: 2.0,
}
}
}

impl ClientConfig {
/// Create a new configuration with the given node identification.
pub fn new(node: Node) -> Self {
Self {
node: Some(node),
..Default::default()
}
}

/// Create a new configuration with a node ID.
pub fn with_node_id(id: impl Into<String>) -> Self {
Self::new(Node {
id: id.into(),
..Default::default()
})
}

/// Set the node identification.
pub fn node(mut self, node: Node) -> Self {
self.node = Some(node);
self
}

/// Set the user agent name.
///
/// This identifies the client to the xDS server. Some servers require
/// specific values (e.g., "grpc", "envoy").
///
/// # Example
///
/// ```
/// use xds_client::ClientConfig;
///
/// let config = ClientConfig::with_node_id("my-node")
/// .user_agent("grpc");
/// ```
pub fn user_agent(mut self, name: impl Into<String>) -> Self {
if let Some(ref mut node) = self.node {
node.user_agent_name = name.into();
} else {
self.node = Some(Node {
user_agent_name: name.into(),
..Default::default()
});
}
self
}

/// Set the initial backoff duration.
pub fn initial_backoff(mut self, duration: Duration) -> Self {
self.initial_backoff = duration;
self
}

/// Set the maximum backoff duration.
pub fn max_backoff(mut self, duration: Duration) -> Self {
self.max_backoff = duration;
self
}

/// Set the backoff multiplier.
pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
self.backoff_multiplier = multiplier;
self
}
}
Loading
Loading