-
Notifications
You must be signed in to change notification settings - Fork 150
(WIP) chore: enable http/1.1 for rust mvtx daemon server #3180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 18 commits
bf6be90
ee83692
f3459f4
9863e40
ca861b6
68b94a3
8d2a20b
55b89d0
3a39b05
b18313e
b53d25b
c0dcb00
2bf1949
b5b2022
cf69384
52e813e
c2d13b4
66910f5
c9813f1
9ff9415
0a939e7
2476929
c2e56af
4d33d0c
f301ca3
ed5823d
53eb0f6
234c36a
1621a13
6dc7589
1f5578c
cba8e26
624a99b
7ccc22b
0581ee8
88f777c
d1e2f36
43f482a
0010d02
a8fec31
88156d3
d951804
efdb10e
24ea2bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,10 @@ | ||
| use bytes::Bytes; | ||
| use http::{Request as HttpRequest, Response as HttpResponse, StatusCode}; | ||
| use http_body_util::Full; | ||
| use hyper::body::Incoming; | ||
| use hyper::server::conn::http1; | ||
| use hyper::service::service_fn; | ||
| use hyper_util::rt::TokioIo; | ||
| use numaflow_pb::servers::mvtxdaemon::mono_vertex_daemon_service_server::{ | ||
| MonoVertexDaemonService, MonoVertexDaemonServiceServer, | ||
| }; | ||
|
|
@@ -8,15 +15,24 @@ use numaflow_pb::servers::mvtxdaemon::{ | |
| use rcgen::{ | ||
| CertificateParams, DistinguishedName, DnType, ExtendedKeyUsagePurpose, KeyPair, KeyUsagePurpose, | ||
| }; | ||
| use rustls::ServerConfig; | ||
| use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}; | ||
| use std::collections::HashMap; | ||
| use std::convert::Infallible; | ||
| use std::error::Error; | ||
| use std::net::SocketAddr; | ||
| use std::result::Result; | ||
| use std::sync::Arc; | ||
| use time::{Duration, OffsetDateTime}; | ||
| use tonic::transport::{Identity, Server, ServerTlsConfig}; | ||
| use tokio::net::TcpListener; | ||
| use tokio::sync::mpsc; | ||
| use tokio_rustls::TlsAcceptor; | ||
| use tokio_stream::StreamExt; | ||
| use tokio_stream::wrappers::ReceiverStream; | ||
| use tonic::transport::Server; | ||
| use tonic::{Request, Response, Status}; | ||
| use tracing::info; | ||
| use tracing::{info, warn}; | ||
|
|
||
| #[derive(Debug, Default)] | ||
| pub struct MvtxDaemonService; | ||
|
|
||
| #[tonic::async_trait] | ||
|
|
@@ -25,8 +41,6 @@ impl MonoVertexDaemonService for MvtxDaemonService { | |
| &self, | ||
| _: Request<()>, | ||
| ) -> Result<Response<GetMonoVertexMetricsResponse>, Status> { | ||
| info!("Received GetMonoVertexMetrics"); | ||
|
|
||
| let mock_processing_rates = HashMap::from([ | ||
| ("default".to_string(), 67.0), | ||
| ("1m".to_string(), 10.0), | ||
|
|
@@ -56,8 +70,6 @@ impl MonoVertexDaemonService for MvtxDaemonService { | |
| &self, | ||
| _: Request<()>, | ||
| ) -> Result<Response<GetMonoVertexStatusResponse>, Status> { | ||
| info!("Received GetMonoVertexStatus"); | ||
|
|
||
| let mock_resp = GetMonoVertexStatusResponse { | ||
| status: Some(MonoVertexStatus { | ||
| status: "mock_status".to_string(), | ||
|
|
@@ -73,8 +85,6 @@ impl MonoVertexDaemonService for MvtxDaemonService { | |
| &self, | ||
| _: Request<GetMonoVertexErrorsRequest>, | ||
| ) -> Result<Response<GetMonoVertexErrorsResponse>, Status> { | ||
| info!("Received GetMonoVertexErrors"); | ||
|
|
||
| let mock_resp = GetMonoVertexErrorsResponse { | ||
| errors: vec![ReplicaErrors { | ||
| replica: "mock_replica".to_string(), | ||
|
|
@@ -92,18 +102,117 @@ const DAEMON_SERVICE_PORT: u16 = 4327; | |
| pub async fn run_monovertex(mvtx_name: String) -> Result<(), Box<dyn Error>> { | ||
| info!("MonoVertex name is {}", mvtx_name); | ||
|
|
||
| let addr = format!("[::]:{}", DAEMON_SERVICE_PORT).parse()?; | ||
| // Create a TCP listener that can listen to both h2 and http 1.1. | ||
| let addr: SocketAddr = format!("[::]:{}", DAEMON_SERVICE_PORT).parse()?; | ||
| let tcp_listener = TcpListener::bind(addr).await?; | ||
| let tls_config = generate_self_signed_tls_config()?; | ||
| let tls_acceptor = TlsAcceptor::from(tls_config); | ||
|
|
||
| // Create two channels, one serving gRPC requests, the other HTTP. | ||
| // Given the request rate a daemon server expect to receive, a buffer size of 1000 should be sufficent. | ||
| let (grpc_tx, grpc_rx) = mpsc::channel(1000); | ||
| let (http_tx, mut http_rx) = mpsc::channel(1000); | ||
|
|
||
| // Start a thread to accept requests. | ||
| let _accept_req_task = tokio::spawn(async move { | ||
qhuai marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| loop { | ||
| // Accept a connection. | ||
| let (tcp, peer_addr) = match tcp_listener.accept().await { | ||
| Ok(v) => v, | ||
| Err(e) => { | ||
| warn!(error = %e, "Failed to accept a TCP connection"); | ||
| continue; | ||
| } | ||
| }; | ||
| // Handle the new connection. | ||
| // Start a new thread so that we don't block accepting other connections. | ||
|
|
||
| let grpc_sender = grpc_tx.clone(); | ||
| let http_sender = http_tx.clone(); | ||
| let acceptor = tls_acceptor.clone(); | ||
|
|
||
| tokio::spawn(async move { | ||
| let stream = match acceptor.accept(tcp).await { | ||
| Ok(s) => s, | ||
| Err(e) => { | ||
| warn!(peer_addr = %peer_addr, error = %e, "TLS handshake failed."); | ||
| // TLS handshake failed, skip handling this connection. | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let alpn = stream | ||
| .get_ref() | ||
| .1 | ||
| .alpn_protocol() | ||
| .map(|p| String::from_utf8_lossy(p).into_owned()); | ||
|
|
||
| let service = MvtxDaemonService; | ||
| let identity = generate_self_signed_identity()?; | ||
| let tls = ServerTlsConfig::new().identity(identity); | ||
| match alpn.as_deref() { | ||
| Some("http/1.1") => { | ||
| // Send to the HTTP channel. | ||
| let _ = http_sender.send(stream).await; | ||
| } | ||
| Some("h2") => { | ||
| // Send to the gRPC channel. | ||
| let _ = grpc_sender.send(stream).await; | ||
| } | ||
| _ => { | ||
| // Send to the HTTP channel by default. | ||
| // This is because most of the time, HTTP is used for communication. | ||
| // On Numaflow, if a client is sending a gRPC request, the h2 protocol is explicitly used. | ||
| let _ = http_sender.send(stream).await; | ||
|
||
| } | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| Server::builder() | ||
| .tls_config(tls)? | ||
| .add_service(MonoVertexDaemonServiceServer::new(service)) | ||
| .serve(addr) | ||
| .await?; | ||
| // Start a thread to serve gRPC requests. | ||
| let grpc_server_task = tokio::spawn(async move { | ||
| let grpc_service = MonoVertexDaemonServiceServer::new(MvtxDaemonService); | ||
| let incoming_stream = ReceiverStream::new(grpc_rx).map(Ok::<_, std::io::Error>); | ||
| Server::builder() | ||
| .add_service(grpc_service) | ||
| .serve_with_incoming(incoming_stream) | ||
| .await | ||
| }); | ||
|
|
||
| // Start a thread to serve HTTP requests. | ||
| let _http_server_task = tokio::spawn(async move { | ||
| // TODO - _svc_for_http will be used for serving HTTP requests. | ||
| let _svc_for_http = MvtxDaemonService; | ||
| while let Some(stream) = http_rx.recv().await { | ||
| tokio::spawn(async move { | ||
| let svc = service_fn(|req: HttpRequest<Incoming>| async move { | ||
| let method = req.method().clone(); | ||
| let path = req.uri().path().to_string(); | ||
|
|
||
| let resp = match (method.as_str(), path.as_str()) { | ||
| ("GET", "/readyz" | "/livez") => HttpResponse::builder() | ||
| .status(StatusCode::NO_CONTENT) | ||
| .body(Full::new(Bytes::new())) | ||
| .unwrap(), | ||
| // TODO - add remaining endpoints. | ||
| _ => HttpResponse::builder() | ||
| .status(StatusCode::NOT_FOUND) | ||
| .body(Full::new(Bytes::new())) | ||
| .unwrap(), | ||
| }; | ||
|
|
||
| // Every error case is translated to a corresponding Response, hence infallible. | ||
| Ok::<HttpResponse<Full<Bytes>>, Infallible>(resp) | ||
| }); | ||
| let io = TokioIo::new(stream); | ||
| let _ = http1::Builder::new().serve_connection(io, svc).await; | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| // Wait for the gRPC server to finish. | ||
| let grpc_res = grpc_server_task.await?; | ||
| grpc_res?; | ||
|
|
||
| // TODO - Gracefully shutdown. | ||
|
||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -113,7 +222,7 @@ pub async fn run_pipeline(pipeline_name: String) -> Result<(), Box<dyn Error>> { | |
| Ok(()) | ||
| } | ||
|
|
||
| fn generate_self_signed_identity() -> Result<Identity, Box<dyn Error>> { | ||
| fn generate_self_signed_tls_config() -> Result<Arc<ServerConfig>, Box<dyn Error>> { | ||
| let mut params = CertificateParams::new(vec!["localhost".to_string()])?; | ||
|
|
||
| let mut dn = DistinguishedName::new(); | ||
|
|
@@ -134,5 +243,18 @@ fn generate_self_signed_identity() -> Result<Identity, Box<dyn Error>> { | |
| let signing_key = KeyPair::generate()?; | ||
| let cert = params.self_signed(&signing_key)?; | ||
|
|
||
| Ok(Identity::from_pem(cert.pem(), signing_key.serialize_pem())) | ||
| let cert_der = cert.der().clone(); | ||
| let key_der = PrivatePkcs8KeyDer::from(signing_key.serialize_der()); | ||
| let key_der = PrivateKeyDer::from(key_der); | ||
|
|
||
| let mut cfg = ServerConfig::builder() | ||
| .with_no_client_auth() | ||
| .with_single_cert(vec![cert_der], key_der)?; | ||
|
|
||
| // Serve both http and gRPC. | ||
| // Note: order matters, most preferred first. | ||
| // We choose http/1.1 first because it's more widely supported. | ||
| cfg.alpn_protocols = vec![b"http/1.1".to_vec(), b"h2".to_vec()]; | ||
|
|
||
| Ok(Arc::new(cfg)) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split this more functions like
start_grpc_server(),start_http_server(),serve_requests()? to make it more readable also makes it easy to review the code.