From 109f40e328d556c84de9726aec85e1f80180968b Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Tue, 14 Jan 2025 11:30:33 +0100 Subject: [PATCH] feat: add HTTP admin endpoint Signed-off-by: Roman Volosatovs --- Cargo.lock | 5 ++++ Cargo.toml | 9 +++++++ src/main.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 6276726e..774a1807 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3579,9 +3579,14 @@ dependencies = [ "anyhow", "async-nats", "async-trait", + "bytes", "chrono", "clap", "futures", + "http 1.2.0", + "http-body-util", + "hyper 1.5.2", + "hyper-util", "nkeys", "opentelemetry 0.17.0", "opentelemetry-otlp", diff --git a/Cargo.toml b/Cargo.toml index 95a88ef2..294dea84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,8 +24,13 @@ members = ["crates/*"] anyhow = { workspace = true } async-nats = { workspace = true } async-trait = { workspace = true } +bytes = { workspace = true } clap = { workspace = true, features = ["derive", "cargo", "env"] } futures = { workspace = true } +http = { workspace = true, features = ["std"] } +http-body-util = { workspace = true } +hyper = { workspace = true } +hyper-util = { workspace = true, features = ["server"] } nkeys = { workspace = true } # One version back to avoid clashes with 0.10 of otlp opentelemetry = { workspace = true, features = ["rt-tokio"] } @@ -53,6 +58,10 @@ chrono = "0.4" clap = { version = "4", features = ["derive", "cargo", "env"] } cloudevents-sdk = "0.7" futures = "0.3" +http = { version = "1", default-features = false } +http-body-util = { version = "0.1", default-features = false } +hyper = { version = "1", default-features = false } +hyper-util = { version = "0.1", default-features = false } indexmap = { version = "2", features = ["serde"] } jsonschema = "0.17" lazy_static = "1" diff --git a/src/main.rs b/src/main.rs index 2d82b6cf..1f389517 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,19 @@ +use core::net::SocketAddr; + use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use anyhow::Context as _; use async_nats::jetstream::{stream::Stream, Context}; +use bytes::Bytes; use clap::Parser; +use hyper_util::rt::{TokioExecutor, TokioIo}; use nats::StreamPersistence; +use tokio::net::TcpListener; +use tokio::spawn; use tokio::sync::Semaphore; -use tracing::log::debug; +use tracing::{debug, error}; use wadm_types::api::DEFAULT_WADM_TOPIC_PREFIX; use wadm::{ @@ -238,6 +245,10 @@ struct Args { hide = true )] max_wasmbus_event_stream_bytes: i64, + + #[clap(long = "http-admin", env = "WADM_HTTP_ADMIN")] + /// HTTP administration endpoint address + http_admin: Option, } #[tokio::main] @@ -456,6 +467,60 @@ async fn main() -> anyhow::Result<()> { ManifestNotifier::new(wadm_event_prefix, context), ) .await?; + + if let Some(addr) = args.http_admin { + let socket = TcpListener::bind(addr) + .await + .context("failed to bind on HTTP administation endpoint")?; + let svc = hyper::service::service_fn(move |req| { + const OK: &str = r#"{"status":"ok"}"#; + async move { + let (http::request::Parts { method, uri, .. }, _) = req.into_parts(); + match (method.as_str(), uri.path()) { + ("HEAD", "/livez") => Ok(http::Response::default()), + ("GET", "/livez") => Ok(http::Response::new(http_body_util::Full::new( + Bytes::from(OK), + ))), + (method, "/livez") => http::Response::builder() + .status(http::StatusCode::METHOD_NOT_ALLOWED) + .body(http_body_util::Full::new(Bytes::from(format!( + "method `{method}` not supported for path `/livez`" + )))), + ("HEAD", "/readyz") => Ok(http::Response::default()), + ("GET", "/readyz") => Ok(http::Response::new(http_body_util::Full::new( + Bytes::from(OK), + ))), + (method, "/readyz") => http::Response::builder() + .status(http::StatusCode::METHOD_NOT_ALLOWED) + .body(http_body_util::Full::new(Bytes::from(format!( + "method `{method}` not supported for path `/readyz`" + )))), + (.., path) => http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body(http_body_util::Full::new(Bytes::from(format!( + "unknown endpoint `{path}`" + )))), + } + } + }); + let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + spawn(async move { + loop { + let stream = match socket.accept().await { + Ok((stream, _)) => stream, + Err(err) => { + error!(?err, "failed to accept HTTP administration connection"); + continue; + } + }; + let svc = svc.clone(); + if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await { + error!(?err, "failed to serve HTTP administration connection"); + } + } + }); + } + tokio::select! { res = server.serve() => { res?