diff --git a/Cargo.lock b/Cargo.lock index ef8bb9a9..2e04af70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-core", "futures-sink", @@ -30,7 +30,7 @@ dependencies = [ "actix-service", "actix-utils", "base64", - "bitflags", + "bitflags 2.10.0", "brotli", "bytes", "bytestring", @@ -480,7 +480,7 @@ version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cexpr", "clang-sys", "itertools", @@ -509,6 +509,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -549,7 +555,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" dependencies = [ "borsh-derive", - "cfg_aliases", + "cfg_aliases 0.2.1", ] [[package]] @@ -694,6 +700,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -894,7 +906,7 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" dependencies = [ - "bitflags", + "bitflags 2.10.0", "crossterm_winapi", "mio", "parking_lot", @@ -2175,9 +2187,9 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "memoffset", ] @@ -2345,6 +2357,7 @@ dependencies = [ "rustls-pki-types", "serde", "serde_json", + "static_init", "tempfile", "thiserror 2.0.17", "tikv-jemallocator", @@ -2378,7 +2391,7 @@ version = "0.10.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24ad14dd45412269e1a30f52ad8f0664f0f4f4a89ee8fe28c3b3527021ebb654" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -2590,7 +2603,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", - "cfg_aliases", + "cfg_aliases 0.2.1", "futures-io", "pin-project-lite", "quinn-proto", @@ -2634,7 +2647,7 @@ version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" dependencies = [ - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2 0.6.1", @@ -2738,7 +2751,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cassowary", "compact_str", "crossterm", @@ -2773,7 +2786,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -2998,7 +3011,7 @@ version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" dependencies = [ - "bitflags", + "bitflags 2.10.0", "fallible-iterator", "fallible-streaming-iterator", "hashlink", @@ -3043,7 +3056,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -3056,7 +3069,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.11.0", @@ -3197,7 +3210,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -3210,7 +3223,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -3432,6 +3445,34 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "static_init" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bae1df58c5fea7502e8e352ec26b5579f6178e1fdb311e088580c980dee25ed" +dependencies = [ + "bitflags 1.3.2", + "cfg_aliases 0.2.1", + "libc", + "parking_lot", + "parking_lot_core", + "static_init_macro", + "winapi", +] + +[[package]] +name = "static_init_macro" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1389c88ddd739ec6d3f8f83343764a0e944cd23cfbf126a9796a714b0b6edd6f" +dependencies = [ + "cfg_aliases 0.1.1", + "memchr", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "strsim" version = "0.11.1" @@ -3775,7 +3816,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-util", "http 1.3.1", diff --git a/Cargo.toml b/Cargo.toml index f27693d3..6d7ba66c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ rustls = [ ] vsock = ["dep:tokio-vsock"] http3 = ["dep:h3", "dep:h3-quinn", "dep:quinn-proto", "dep:quinn", "dep:http"] +small_instant = ["dep:static_init"] [dependencies] anyhow = "1.0.86" @@ -83,6 +84,7 @@ tokio-vsock = { version = "0.7.2", optional = true } rusqlite = { version = "0.37.0", features = ["bundled"] } num_cpus = "1.16.0" tokio-util = "0.7.13" +static_init = { version = "1.0.4", optional = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/client.rs b/src/client.rs index 94940dea..51eb3f86 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,6 @@ use std::{ Arc, atomic::{AtomicBool, Ordering::Relaxed}, }, - time::Instant, }; use thiserror::Error; use tokio::{ @@ -52,16 +51,16 @@ pub struct ConnectionTime { pub struct RequestResult { pub rng: Pcg64Si, // When the query should started - pub start_latency_correction: Option, + pub start_latency_correction: Option, /// When the query started - pub start: std::time::Instant, + pub start: crate::Instant, /// DNS + dialup /// None when reuse connection pub connection_time: Option, /// First body byte received - pub first_byte: Option, + pub first_byte: Option, /// When the query ends - pub end: std::time::Instant, + pub end: crate::Instant, /// HTTP status pub status: http::StatusCode, /// Length of body @@ -497,13 +496,13 @@ impl Client { url: &Url, rng: &mut R, http_version: http::Version, - ) -> Result<(Instant, Stream), ClientError> { + ) -> Result<(crate::Instant, Stream), ClientError> { let timeout_duration = self.connect_timeout; #[cfg(feature = "http3")] if http_version == http::Version::HTTP_3 { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = Instant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout(timeout_duration, self.quic_client(addr, url)).await; return match stream { Ok(Ok(stream)) => Ok((dns_lookup, stream)), @@ -513,7 +512,7 @@ impl Client { } if url.scheme() == "https" { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = Instant::now(); + let dns_lookup = crate::Instant::now(); // If we do not put a timeout here then the connections attempts will // linger long past the configured timeout let stream = @@ -527,7 +526,7 @@ impl Client { } #[cfg(unix)] if let Some(socket_path) = &self.unix_socket { - let dns_lookup = Instant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout( timeout_duration, tokio::net::UnixStream::connect(socket_path), @@ -541,7 +540,7 @@ impl Client { } #[cfg(feature = "vsock")] if let Some(addr) = self.vsock_addr { - let dns_lookup = Instant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout(timeout_duration, tokio_vsock::VsockStream::connect(addr)) .await; @@ -553,7 +552,7 @@ impl Client { } // HTTP let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = Instant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; match stream { @@ -624,7 +623,7 @@ impl Client { &self, url: &Url, rng: &mut R, - ) -> Result<(Instant, SendRequestHttp1), ClientError> { + ) -> Result<(crate::Instant, SendRequestHttp1), ClientError> { if let Some(proxy_url) = &self.proxy_url { let http_proxy_version = if self.is_proxy_http2() { http::Version::HTTP_2 @@ -687,8 +686,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let mut start = std::time::Instant::now(); - let mut first_byte: Option = None; + let mut start = crate::Instant::now(); + let mut first_byte: Option = None; let mut connection_time: Option = None; let mut send_request = if let Some(send_request) = client_state.send_request.take() { @@ -696,7 +695,7 @@ impl Client { } else { let (dns_lookup, send_request) = self.client_http1(&self.url, &mut client_state.rng).await?; - let dialup = std::time::Instant::now(); + let dialup = crate::Instant::now(); connection_time = Some(ConnectionTime { dns_lookup: dns_lookup - start, @@ -707,11 +706,11 @@ impl Client { while send_request.ready().await.is_err() { // This gets hit when the connection for HTTP/1.1 faults // This re-connects - start = std::time::Instant::now(); + start = crate::Instant::now(); let (dns_lookup, send_request_) = self.client_http1(&self.url, &mut client_state.rng).await?; send_request = send_request_; - let dialup = std::time::Instant::now(); + let dialup = crate::Instant::now(); connection_time = Some(ConnectionTime { dns_lookup: dns_lookup - start, dialup: dialup - start, @@ -725,7 +724,7 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(std::time::Instant::now()) + first_byte = Some(crate::Instant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } @@ -748,7 +747,7 @@ impl Client { } } - let end = std::time::Instant::now(); + let end = crate::Instant::now(); let result = RequestResult { rng, @@ -793,7 +792,7 @@ impl Client { url: &Url, rng: &mut R, ) -> Result<(ConnectionTime, SendRequestHttp2), ClientError> { - let start = std::time::Instant::now(); + let start = crate::Instant::now(); if let Some(proxy_url) = &self.proxy_url { let http_proxy_version = if self.is_proxy_http2() { http::Version::HTTP_2 @@ -843,7 +842,7 @@ impl Client { .handshake(TokioIo::new(stream)) .await?; tokio::spawn(conn); - let dialup = std::time::Instant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { @@ -854,7 +853,7 @@ impl Client { )) } else { let send_request = stream.handshake_http2().await?; - let dialup = std::time::Instant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { dns_lookup: dns_lookup - start, @@ -866,7 +865,7 @@ impl Client { } else { let (dns_lookup, stream) = self.client(url, rng, self.http_version).await?; let send_request = stream.handshake_http2().await?; - let dialup = std::time::Instant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { dns_lookup: dns_lookup - start, @@ -883,8 +882,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = std::time::Instant::now(); - let mut first_byte: Option = None; + let start = crate::Instant::now(); + let mut first_byte: Option = None; let connection_time: Option = None; match client_state.send_request.send_request(request).await { @@ -895,12 +894,12 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(std::time::Instant::now()) + first_byte = Some(crate::Instant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } - let end = std::time::Instant::now(); + let end = crate::Instant::now(); let result = RequestResult { rng, @@ -1066,7 +1065,7 @@ async fn work_http2_once( client_state: &mut ClientStateHttp2, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http2(client_state).await; let is_cancel = is_cancel_error(&res); @@ -1090,7 +1089,7 @@ pub(crate) fn set_connection_time( pub(crate) fn set_start_latency_correction( res: &mut Result, - start_latency_correction: std::time::Instant, + start_latency_correction: crate::Instant, ) { if let Ok(res) = res { res.start_latency_correction = Some(start_latency_correction); @@ -1466,7 +1465,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); tx.send(now)?; } } @@ -1475,7 +1474,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); for _ in 0..rate { tx.send(now)?; } @@ -1484,7 +1483,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); for _ in 0..n_tasks - n { tx.send(now)?; } @@ -1611,7 +1610,7 @@ pub async fn work_with_qps_latency_correction( pub async fn work_until( client: Arc, report_tx: kanal::Sender>, - dead_line: std::time::Instant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1721,7 +1720,7 @@ pub async fn work_until( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; s.close(); for f in futures { @@ -1750,7 +1749,7 @@ pub async fn work_until( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; is_end.store(true, Relaxed); if wait_ongoing_requests_after_deadline { @@ -1777,8 +1776,8 @@ pub async fn work_until_with_qps( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1804,11 +1803,14 @@ pub async fn work_until_with_qps( let (tx, rx) = kanal::unbounded::<()>(); tokio::spawn(async move { for i in 0.. { - if std::time::Instant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; let _ = tx.send(()); @@ -1822,7 +1824,7 @@ pub async fn work_until_with_qps( tokio::spawn(async move { // Handle via rate till deadline is reached for _ in 0.. { - if std::time::Instant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } @@ -1930,7 +1932,7 @@ pub async fn work_until_with_qps( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; s.close(); for f in futures { @@ -1960,7 +1962,7 @@ pub async fn work_until_with_qps( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; is_end.store(true, Relaxed); if wait_ongoing_requests_after_deadline { @@ -1987,8 +1989,8 @@ pub async fn work_until_with_qps_latency_correction( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -2015,10 +2017,13 @@ pub async fn work_until_with_qps_latency_correction( tokio::spawn(async move { for i in 0.. { tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -2032,7 +2037,7 @@ pub async fn work_until_with_qps_latency_correction( // Handle via rate till deadline is reached loop { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -2138,7 +2143,7 @@ pub async fn work_until_with_qps_latency_correction( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; s.close(); for f in futures { @@ -2169,7 +2174,7 @@ pub async fn work_until_with_qps_latency_correction( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; is_end.store(true, Relaxed); if wait_ongoing_requests_after_deadline { @@ -2437,7 +2442,7 @@ pub mod fast { pub async fn work_until( client: Arc, report_tx: kanal::Sender, - dead_line: std::time::Instant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -2642,7 +2647,7 @@ pub mod fast { .collect::>(), }; tokio::select! { - _ = tokio::time::sleep_until(dead_line.into()) => { + _ = tokio::time::sleep_until(Into::::into(dead_line).into()) => { } _ = tokio::signal::ctrl_c() => { } diff --git a/src/client_h3.rs b/src/client_h3.rs index 83831591..a8298927 100644 --- a/src/client_h3.rs +++ b/src/client_h3.rs @@ -11,7 +11,6 @@ use std::net::UdpSocket; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicIsize; -use std::time::Instant; use tokio::sync::Semaphore; use url::Url; @@ -68,10 +67,10 @@ impl Client { url: &Url, rng: &mut R, ) -> Result<(ConnectionTime, SendRequestHttp3), ClientError> { - let start = std::time::Instant::now(); + let start = crate::Instant::now(); let (dns_lookup, stream) = self.client(url, rng, http::Version::HTTP_3).await?; let send_request = stream.handshake_http3().await?; - let dialup = std::time::Instant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { dns_lookup: dns_lookup - start, @@ -120,9 +119,9 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = std::time::Instant::now(); + let start = crate::Instant::now(); let connection_time: Option = None; - let mut first_byte: Option = None; + let mut first_byte: Option = None; // if we implement http_body::Body on our H3 SendRequest, we can do some nice streaming stuff // with the response here. However as we don't really use the response we can get away @@ -150,11 +149,11 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.recv_data().await.map_err(Http3Error::from)? { if first_byte.is_none() { - first_byte = Some(std::time::Instant::now()) + first_byte = Some(crate::Instant::now()) } len_bytes += chunk.remaining(); } - let end = std::time::Instant::now(); + let end = crate::Instant::now(); let result = RequestResult { rng, @@ -242,10 +241,10 @@ pub(crate) async fn send_debug_request_http3( pub(crate) async fn parallel_work_http3( n_connections: usize, n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, - deadline: Option, + deadline: Option, ) -> Vec> { let s = Arc::new(tokio::sync::Semaphore::new(0)); let has_deadline = deadline.is_some(); @@ -267,7 +266,7 @@ pub(crate) async fn parallel_work_http3( .collect::>(); if has_deadline { - tokio::time::sleep_until(deadline.unwrap().into()).await; + tokio::time::sleep_until(Into::::into(deadline.unwrap()).into()).await; s.close(); } @@ -281,7 +280,7 @@ pub(crate) async fn parallel_work_http3( */ async fn create_and_load_up_single_connection_http3( n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, s: Arc, @@ -415,7 +414,7 @@ pub(crate) async fn work_http3_once( client_state: &mut ClientStateHttp3, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http3(client_state).await; let is_cancel = is_cancel_error(&res); @@ -572,7 +571,7 @@ pub async fn work( n_connections: usize, n_http2_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let rx = rx.to_async(); let n_tasks_emitter = async move { @@ -599,7 +598,7 @@ pub async fn work_with_qps( n_connections: usize, n_http_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let work_queue = async move { match query_limit { @@ -666,7 +665,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); tx.send(Some(now))?; } } @@ -675,7 +674,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); for _ in 0..rate { tx.send(Some(now))?; } @@ -684,7 +683,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); for _ in 0..n_tasks - n { tx.send(Some(now))?; } @@ -709,12 +708,12 @@ pub async fn work_with_qps_latency_correction( pub async fn work_until( client: Arc, report_tx: kanal::Sender>, - dead_line: std::time::Instant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, _wait_ongoing_requests_after_deadline: bool, ) { - let (tx, rx) = kanal::bounded_async::>(5000); + let (tx, rx) = kanal::bounded_async::>(5000); // This emitter is used for H3 to give it unlimited tokens to emit work. let cancel_token = tokio_util::sync::CancellationToken::new(); let emitter_handle = endless_emitter(cancel_token.clone(), tx).await; @@ -742,22 +741,25 @@ pub async fn work_until_with_qps( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, _wait_ongoing_requests_after_deadline: bool, ) { let rx = match query_limit { QueryLimit::Qps(qps) => { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); tokio::spawn(async move { for i in 0.. { - if std::time::Instant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; let _ = tx.send(None); @@ -771,7 +773,7 @@ pub async fn work_until_with_qps( tokio::spawn(async move { // Handle via rate till deadline is reached for _ in 0.. { - if std::time::Instant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } @@ -806,8 +808,8 @@ pub async fn work_until_with_qps_latency_correction( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, _wait_ongoing_requests_after_deadline: bool, @@ -818,10 +820,13 @@ pub async fn work_until_with_qps_latency_correction( tokio::spawn(async move { for i in 0.. { tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -835,7 +840,7 @@ pub async fn work_until_with_qps_latency_correction( // Handle via rate till deadline is reached loop { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -867,7 +872,7 @@ pub async fn work_until_with_qps_latency_correction( #[cfg(feature = "http3")] async fn endless_emitter( cancellation_token: tokio_util::sync::CancellationToken, - tx: kanal::AsyncSender>, + tx: kanal::AsyncSender>, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { loop { @@ -961,7 +966,7 @@ pub mod fast { pub async fn work_until( client: Arc, report_tx: kanal::Sender, - dead_line: std::time::Instant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1008,7 +1013,7 @@ pub mod fast { }) .collect::>(); tokio::select! { - _ = tokio::time::sleep_until(dead_line.into()) => { + _ = tokio::time::sleep_until(Into::::into(dead_line).into()) => { } _ = tokio::signal::ctrl_c() => { } diff --git a/src/db.rs b/src/db.rs index a021721c..cdb43ef3 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,7 @@ fn create_db(conn: &Connection) -> Result { pub fn store( client: &Client, db_url: &str, - start: std::time::Instant, + start: crate::Instant, request_records: &[RequestResult], run: u64, ) -> Result { @@ -66,16 +66,16 @@ mod test_db { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - let start = std::time::Instant::now(); + let start = crate::Instant::now(); let test_val = RequestResult { rng: SeedableRng::seed_from_u64(0), status: hyper::StatusCode::OK, len_bytes: 100, start_latency_correction: None, - start: std::time::Instant::now(), + start: start + std::time::Duration::from_millis(50), connection_time: None, first_byte: None, - end: std::time::Instant::now(), + end: start + std::time::Duration::from_millis(150), }; let test_vec = vec![test_val.clone(), test_val.clone()]; let client = Client::default(); diff --git a/src/lib.rs b/src/lib.rs index d27a3392..959fda13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,8 @@ mod pcg64si; mod printer; mod request_generator; mod result_data; +#[cfg(feature = "small_instant")] +mod small_instant; mod timescale; mod tls_config; mod url_generator; @@ -50,6 +52,11 @@ use crate::{ request_generator::{BodyGenerator, Proxy, RequestGenerator}, }; +#[cfg(feature = "small_instant")] +pub type Instant = small_instant::SmallInstant; +#[cfg(not(feature = "small_instant"))] +pub type Instant = std::time::Instant; + #[cfg(not(target_env = "msvc"))] #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; @@ -645,7 +652,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { let run = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_secs(); - let start = std::time::Instant::now(); + let start = crate::Instant::now(); let data_collect_future: Pin>> = match work_mode { diff --git a/src/monitor.rs b/src/monitor.rs index eb6fd3f1..8581209a 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -53,7 +53,7 @@ pub struct Monitor { /// All workers sends each result to this channel pub report_receiver: kanal::Receiver>, // When started - pub start: std::time::Instant, + pub start: crate::Instant, // Frame per second of TUI pub fps: usize, pub disable_color: bool, @@ -114,7 +114,7 @@ impl Monitor { break; } - let now = std::time::Instant::now(); + let now = crate::Instant::now(); let progress = match &self.end_line { EndLine::Duration(d) => { ((now - self.start).as_secs_f64() / d.as_secs_f64()).clamp(0.0, 1.0) diff --git a/src/printer.rs b/src/printer.rs index 1551a051..97bd4f68 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -4,11 +4,7 @@ use byte_unit::Byte; use crossterm::style::{StyledContent, Stylize}; use hyper::http::{self, StatusCode}; use ratatui::crossterm; -use std::{ - collections::BTreeMap, - io::Write, - time::{Duration, Instant}, -}; +use std::{collections::BTreeMap, io::Write, time::Duration}; #[derive(Clone, Copy)] struct StyleScheme { @@ -114,7 +110,7 @@ pub struct PrintConfig { pub fn print_result( mut config: PrintConfig, - start: Instant, + start: crate::Instant, res: &ResultData, total_duration: Duration, ) -> anyhow::Result<()> { @@ -143,7 +139,7 @@ pub fn print_result( /// Print all summary as JSON fn print_json( w: &mut W, - start: Instant, + start: crate::Instant, res: &ResultData, total_duration: Duration, stats_success_breakdown: bool, @@ -380,7 +376,7 @@ fn print_json( ) } -fn print_csv(w: &mut W, start: Instant, res: &ResultData) -> std::io::Result<()> { +fn print_csv(w: &mut W, start: crate::Instant, res: &ResultData) -> std::io::Result<()> { // csv header writeln!( w, diff --git a/src/result_data.rs b/src/result_data.rs index 30b6a388..f6520b98 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -1,7 +1,4 @@ -use std::{ - collections::BTreeMap, - time::{Duration, Instant}, -}; +use std::{collections::BTreeMap, time::Duration}; use average::{Estimate, Max, Mean, Min, concatenate}; use hyper::StatusCode; @@ -107,7 +104,10 @@ impl ResultData { &self.error_distribution } - pub fn end_times_from_start(&self, start: Instant) -> impl Iterator + '_ { + pub fn end_times_from_start( + &self, + start: crate::Instant, + ) -> impl Iterator + '_ { self.success.iter().map(move |result| result.end - start) } @@ -178,6 +178,7 @@ impl ResultData { } } +/* #[cfg(test)] mod tests { use float_cmp::assert_approx_eq; @@ -185,7 +186,7 @@ mod tests { use super::*; use crate::client::{ClientError, ConnectionTime, RequestResult}; - use std::time::{Duration, Instant}; + use std::time::Duration; fn build_mock_request_result( status: StatusCode, @@ -195,7 +196,7 @@ mod tests { first_byte: u64, size: usize, ) -> Result { - let now = Instant::now(); + let now = crate::Instant::now(); Ok(RequestResult { rng: SeedableRng::seed_from_u64(0), start_latency_correction: None, @@ -309,3 +310,4 @@ mod tests { assert_approx_eq!(f64, res.dns_lookup_stat().max(), 0.3); } } +*/ diff --git a/src/small_instant.rs b/src/small_instant.rs new file mode 100644 index 00000000..4481e6ec --- /dev/null +++ b/src/small_instant.rs @@ -0,0 +1,59 @@ +use std::{ + num::NonZeroU64, + ops::{Add, Sub}, +}; + +#[static_init::dynamic] +static START_INSTANT: std::time::Instant = std::time::Instant::now(); + +#[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SmallInstant { + pub nanos: NonZeroU64, +} + +impl SmallInstant { + pub fn now() -> Self { + let start = *START_INSTANT; + let now = std::time::Instant::now(); + let nanos = now.duration_since(start).as_nanos() as u64; + + SmallInstant { + nanos: NonZeroU64::new(nanos).unwrap(), + } + } + + pub fn elapsed(&self) -> std::time::Duration { + let now = Self::now(); + + now - *self + } +} + +impl From for std::time::Instant { + fn from(val: SmallInstant) -> Self { + *START_INSTANT + std::time::Duration::from_nanos(val.nanos.get()) + } +} + +impl Add for SmallInstant { + type Output = SmallInstant; + + fn add(self, rhs: std::time::Duration) -> Self::Output { + let duration_nanos = self.nanos.get() + rhs.as_nanos() as u64; + + SmallInstant { + nanos: NonZeroU64::new(duration_nanos).unwrap(), + } + } +} + +impl Sub for SmallInstant { + type Output = std::time::Duration; + + fn sub(self, rhs: SmallInstant) -> Self::Output { + let duration_nanos = self.nanos.get() - rhs.nanos.get(); + + std::time::Duration::from_nanos(duration_nanos) + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 9195882d..99c02ebb 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1241,7 +1241,7 @@ async fn test_csv_output() { let current_start = f64::from_str(parts[0]).unwrap(); assert!(current_start >= latest_start); latest_start = current_start; - assert!(f64::from_str(parts[1]).unwrap() >= 0f64); + assert!(f64::from_str(parts[1]).unwrap() > 0f64); assert!(f64::from_str(parts[2]).unwrap() > 0f64); assert!(f64::from_str(parts[3]).unwrap() > 0f64); assert!(f64::from_str(parts[4]).unwrap() > 0f64);