Skip to content
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
bf6be90
.
KeranYang Jan 31, 2026
ee83692
http/1.1
KeranYang Jan 31, 2026
f3459f4
handling incoming connections
KeranYang Jan 31, 2026
9863e40
naming
KeranYang Jan 31, 2026
ca861b6
http server
KeranYang Feb 1, 2026
68b94a3
small cleanup
KeranYang Feb 1, 2026
8d2a20b
tracing
KeranYang Feb 1, 2026
55b89d0
nit
KeranYang Feb 2, 2026
3a39b05
no need for axum for now
KeranYang Feb 2, 2026
b18313e
.
KeranYang Feb 2, 2026
b53d25b
.
KeranYang Feb 2, 2026
c0dcb00
Merge branch 'main' into httpsever
KeranYang Feb 2, 2026
2bf1949
fix dep issues
KeranYang Feb 2, 2026
b5b2022
fix clippy
KeranYang Feb 2, 2026
cf69384
minimal changes
KeranYang Feb 2, 2026
52e813e
.
KeranYang Feb 2, 2026
c2d13b4
.
KeranYang Feb 3, 2026
66910f5
Merge branch 'main' into httpsever
KeranYang Feb 3, 2026
c9813f1
Merge branch 'main' into httpsever
KeranYang Feb 5, 2026
9ff9415
gracefully shutdown
KeranYang Feb 4, 2026
0a939e7
separate files
KeranYang Feb 4, 2026
2476929
add unit test for the connection acceptor
KeranYang Feb 5, 2026
c2e56af
.
KeranYang Feb 5, 2026
4d33d0c
unit tests for run_grpc_server
KeranYang Feb 5, 2026
f301ca3
unit test for http server
KeranYang Feb 5, 2026
ed5823d
error
KeranYang Feb 6, 2026
53eb0f6
ut done
KeranYang Feb 6, 2026
234c36a
.
KeranYang Feb 6, 2026
1621a13
Merge branch 'main' into httpsever
KeranYang Feb 6, 2026
6dc7589
fix ut
KeranYang Feb 6, 2026
1f5578c
clippy
KeranYang Feb 6, 2026
cba8e26
fix ut
KeranYang Feb 7, 2026
624a99b
ut
KeranYang Feb 7, 2026
7ccc22b
clippy
KeranYang Feb 7, 2026
0581ee8
.
KeranYang Feb 8, 2026
88f777c
abstract tcp/tls to conn acceptor
KeranYang Feb 8, 2026
d1e2f36
.
KeranYang Feb 8, 2026
43f482a
test helper func
KeranYang Feb 9, 2026
0010d02
.
KeranYang Feb 9, 2026
a8fec31
Merge branch 'main' into httpsever
KeranYang Feb 9, 2026
88156d3
.
KeranYang Feb 9, 2026
d951804
simplify
KeranYang Feb 17, 2026
efdb10e
cleanup
KeranYang Feb 18, 2026
24ea2bb
ut
KeranYang Feb 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions rust/numaflow-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ tokio.workspace = true
numaflow-pb.workspace = true
rcgen.workspace = true
time.workspace = true
rustls.workspace = true
http.workspace = true
bytes.workspace = true
tokio-stream.workspace = true

# Daemon-specific dependencies
http-body-util = "0.1.2"
hyper = "1.6.0"
hyper-util = "0.1.10"
tokio-rustls = { version = "0.26.2", default-features = false, features = ["logging", "tls12"] }

[lints]
workspace = true
162 changes: 142 additions & 20 deletions rust/numaflow-daemon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use bytes::Bytes;
use http::{Request as HttpRequest, Response as HttpResponse, StatusCode};
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use numaflow_pb::servers::mvtxdaemon::mono_vertex_daemon_service_server::{
MonoVertexDaemonService, MonoVertexDaemonServiceServer,
};
Expand All @@ -8,15 +15,24 @@ use numaflow_pb::servers::mvtxdaemon::{
use rcgen::{
CertificateParams, DistinguishedName, DnType, ExtendedKeyUsagePurpose, KeyPair, KeyUsagePurpose,
};
use rustls::ServerConfig;
use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer};
use std::collections::HashMap;
use std::convert::Infallible;
use std::error::Error;
use std::net::SocketAddr;
use std::result::Result;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_rustls::TlsAcceptor;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tracing::info;
use tracing::{info, warn};

#[derive(Debug, Default)]
pub struct MvtxDaemonService;

#[tonic::async_trait]
Expand All @@ -25,8 +41,6 @@ impl MonoVertexDaemonService for MvtxDaemonService {
&self,
_: Request<()>,
) -> Result<Response<GetMonoVertexMetricsResponse>, Status> {
info!("Received GetMonoVertexMetrics");

let mock_processing_rates = HashMap::from([
("default".to_string(), 67.0),
("1m".to_string(), 10.0),
Expand Down Expand Up @@ -56,8 +70,6 @@ impl MonoVertexDaemonService for MvtxDaemonService {
&self,
_: Request<()>,
) -> Result<Response<GetMonoVertexStatusResponse>, Status> {
info!("Received GetMonoVertexStatus");

let mock_resp = GetMonoVertexStatusResponse {
status: Some(MonoVertexStatus {
status: "mock_status".to_string(),
Expand All @@ -73,8 +85,6 @@ impl MonoVertexDaemonService for MvtxDaemonService {
&self,
_: Request<GetMonoVertexErrorsRequest>,
) -> Result<Response<GetMonoVertexErrorsResponse>, Status> {
info!("Received GetMonoVertexErrors");

let mock_resp = GetMonoVertexErrorsResponse {
errors: vec![ReplicaErrors {
replica: "mock_replica".to_string(),
Expand All @@ -92,18 +102,117 @@ const DAEMON_SERVICE_PORT: u16 = 4327;
pub async fn run_monovertex(mvtx_name: String) -> Result<(), Box<dyn Error>> {
info!("MonoVertex name is {}", mvtx_name);

let addr = format!("[::]:{}", DAEMON_SERVICE_PORT).parse()?;
// Create a TCP listener that can listen to both h2 and http 1.1.
let addr: SocketAddr = format!("[::]:{}", DAEMON_SERVICE_PORT).parse()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this more functions like start_grpc_server(),
start_http_server(), serve_requests()? to make it more readable also makes it easy to review the code.

let tcp_listener = TcpListener::bind(addr).await?;
let tls_config = generate_self_signed_tls_config()?;
let tls_acceptor = TlsAcceptor::from(tls_config);

// Create two channels, one serving gRPC requests, the other HTTP.
// Given the request rate a daemon server expect to receive, a buffer size of 1000 should be sufficent.
let (grpc_tx, grpc_rx) = mpsc::channel(1000);
let (http_tx, mut http_rx) = mpsc::channel(1000);

// Start a thread to accept requests.
let _accept_req_task = tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure tokio tasks exit? spawned tasks are not tracked

loop {
// Accept a connection.
let (tcp, peer_addr) = match tcp_listener.accept().await {
Ok(v) => v,
Err(e) => {
warn!(error = %e, "Failed to accept a TCP connection");
continue;
}
};
// Handle the new connection.
// Start a new thread so that we don't block accepting other connections.

let grpc_sender = grpc_tx.clone();
let http_sender = http_tx.clone();
let acceptor = tls_acceptor.clone();

tokio::spawn(async move {
let stream = match acceptor.accept(tcp).await {
Ok(s) => s,
Err(e) => {
warn!(peer_addr = %peer_addr, error = %e, "TLS handshake failed.");
// TLS handshake failed, skip handling this connection.
return;
}
};

let alpn = stream
.get_ref()
.1
.alpn_protocol()
.map(|p| String::from_utf8_lossy(p).into_owned());

let service = MvtxDaemonService;
let identity = generate_self_signed_identity()?;
let tls = ServerTlsConfig::new().identity(identity);
match alpn.as_deref() {
Some("http/1.1") => {
// Send to the HTTP channel.
let _ = http_sender.send(stream).await;
}
Some("h2") => {
// Send to the gRPC channel.
let _ = grpc_sender.send(stream).await;
}
_ => {
// Send to the HTTP channel by default.
// This is because most of the time, HTTP is used for communication.
// On Numaflow, if a client is sending a gRPC request, the h2 protocol is explicitly used.
let _ = http_sender.send(stream).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log or handle errors?

}
}
});
}
});

Server::builder()
.tls_config(tls)?
.add_service(MonoVertexDaemonServiceServer::new(service))
.serve(addr)
.await?;
// Start a thread to serve gRPC requests.
let grpc_server_task = tokio::spawn(async move {
let grpc_service = MonoVertexDaemonServiceServer::new(MvtxDaemonService);
let incoming_stream = ReceiverStream::new(grpc_rx).map(Ok::<_, std::io::Error>);
Server::builder()
.add_service(grpc_service)
.serve_with_incoming(incoming_stream)
.await
});

// Start a thread to serve HTTP requests.
let _http_server_task = tokio::spawn(async move {
// TODO - _svc_for_http will be used for serving HTTP requests.
let _svc_for_http = MvtxDaemonService;
while let Some(stream) = http_rx.recv().await {
tokio::spawn(async move {
let svc = service_fn(|req: HttpRequest<Incoming>| async move {
let method = req.method().clone();
let path = req.uri().path().to_string();

let resp = match (method.as_str(), path.as_str()) {
("GET", "/readyz" | "/livez") => HttpResponse::builder()
.status(StatusCode::NO_CONTENT)
.body(Full::new(Bytes::new()))
.unwrap(),
// TODO - add remaining endpoints.
_ => HttpResponse::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::new()))
.unwrap(),
};

// Every error case is translated to a corresponding Response, hence infallible.
Ok::<HttpResponse<Full<Bytes>>, Infallible>(resp)
});
let io = TokioIo::new(stream);
let _ = http1::Builder::new().serve_connection(io, svc).await;
});
}
});

// Wait for the gRPC server to finish.
let grpc_res = grpc_server_task.await?;
grpc_res?;

// TODO - Gracefully shutdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

graceful shutdown is critical, can we tackle that before merging the PR?

Ok(())
}

Expand All @@ -113,7 +222,7 @@ pub async fn run_pipeline(pipeline_name: String) -> Result<(), Box<dyn Error>> {
Ok(())
}

fn generate_self_signed_identity() -> Result<Identity, Box<dyn Error>> {
fn generate_self_signed_tls_config() -> Result<Arc<ServerConfig>, Box<dyn Error>> {
let mut params = CertificateParams::new(vec!["localhost".to_string()])?;

let mut dn = DistinguishedName::new();
Expand All @@ -134,5 +243,18 @@ fn generate_self_signed_identity() -> Result<Identity, Box<dyn Error>> {
let signing_key = KeyPair::generate()?;
let cert = params.self_signed(&signing_key)?;

Ok(Identity::from_pem(cert.pem(), signing_key.serialize_pem()))
let cert_der = cert.der().clone();
let key_der = PrivatePkcs8KeyDer::from(signing_key.serialize_der());
let key_der = PrivateKeyDer::from(key_der);

let mut cfg = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der], key_der)?;

// Serve both http and gRPC.
// Note: order matters, most preferred first.
// We choose http/1.1 first because it's more widely supported.
cfg.alpn_protocols = vec![b"http/1.1".to_vec(), b"h2".to_vec()];

Ok(Arc::new(cfg))
}