diff --git a/src/clients/store/s3/mod.rs b/src/clients/store/s3/mod.rs index 46a54573..35146825 100644 --- a/src/clients/store/s3/mod.rs +++ b/src/clients/store/s3/mod.rs @@ -18,8 +18,9 @@ mod aws_helpers; use aws_helpers::*; -// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare -// senders in the pool manager and pass them over a queue to our worker tasks +const MB: usize = 1024 * 1024; + +// launch all worker tasks pub fn launch_tasks( runtime: &mut Runtime, config: Config, @@ -98,6 +99,8 @@ async fn task( let mut session_requests = 0; let mut session_start = Instant::now(); + let mut body = BytesMut::new(); + while RUNNING.load(Ordering::Relaxed) { if client.is_none() { if session_requests != 0 { @@ -107,8 +110,9 @@ async fn task( } CONNECT.increment(); - let c: Client<_, Full> = - Client::builder(TokioExecutor::new()).build(connector.clone()); + let c: Client<_, Full> = Client::builder(TokioExecutor::new()) + .http1_max_buf_size(16 * MB) + .build(connector.clone()); client = Some(c); @@ -146,7 +150,8 @@ async fn task( let status = response.status().as_u16(); // wait until we have a complete response body - let mut body = BytesMut::new(); + + body.truncate(0); let mut ttfb = None; @@ -162,10 +167,13 @@ async fn task( } else { STORE_RESPONSE_EX.increment(); STORE_GET_EX.increment(); + + continue; } } - let latency = start.elapsed(); + let latency = start.elapsed().as_nanos() as u64; + let ttfb = ttfb.map(|v| v.as_nanos() as u64).unwrap_or(latency); STORE_REQUEST_OK.increment(); @@ -176,12 +184,8 @@ async fn task( STORE_RESPONSE_FOUND.increment(); STORE_GET_KEY_FOUND.increment(); - let _ = - STORE_RESPONSE_LATENCY.increment(latency.as_nanos() as _); - - if let Some(ttfb) = ttfb { - let _ = STORE_RESPONSE_TTFB.increment(ttfb.as_nanos() as _); - } + let _ = STORE_RESPONSE_LATENCY.increment(latency); + let _ = STORE_RESPONSE_TTFB.increment(ttfb); } 404 => { STORE_RESPONSE_OK.increment(); @@ -189,8 +193,8 @@ async fn task( STORE_RESPONSE_NOT_FOUND.increment(); STORE_GET_KEY_NOT_FOUND.increment(); - let _ = - STORE_RESPONSE_LATENCY.increment(latency.as_nanos() as _); + let _ = STORE_RESPONSE_LATENCY.increment(latency); + let _ = STORE_RESPONSE_TTFB.increment(ttfb); } 503 => { STORE_RESPONSE_RATELIMITED.increment(); @@ -222,13 +226,34 @@ async fn task( let start = Instant::now(); match c.request(request).await { - Ok(response) => { + Ok(mut response) => { let status = response.status().as_u16(); // wait until we have a complete response body - let body = response.into_body().collect().await.unwrap().to_bytes(); - let latency = start.elapsed(); + body.truncate(0); + + let mut ttfb = None; + + while let Some(next) = response.frame().await { + if let Ok(frame) = next { + if let Some(chunk) = frame.data_ref() { + if ttfb.is_none() { + ttfb = Some(start.elapsed()); + } + + body.extend_from_slice(chunk); + } + } else { + STORE_RESPONSE_EX.increment(); + STORE_PUT_EX.increment(); + + continue; + } + } + + let latency = start.elapsed().as_nanos() as u64; + let ttfb = ttfb.map(|v| v.as_nanos() as u64).unwrap_or(latency); STORE_REQUEST_OK.increment(); @@ -239,8 +264,8 @@ async fn task( STORE_RESPONSE_FOUND.increment(); STORE_PUT_STORED.increment(); - let _ = - STORE_RESPONSE_LATENCY.increment(latency.as_nanos() as _); + let _ = STORE_RESPONSE_LATENCY.increment(latency); + let _ = STORE_RESPONSE_TTFB.increment(ttfb); } 503 => { STORE_RESPONSE_RATELIMITED.increment(); @@ -270,13 +295,33 @@ async fn task( let start = Instant::now(); match c.request(request).await { - Ok(response) => { + Ok(mut response) => { let status = response.status().as_u16(); // wait until we have a complete response body - let body = response.into_body().collect().await.unwrap().to_bytes(); + body.truncate(0); + + let mut ttfb = None; + + while let Some(next) = response.frame().await { + if let Ok(frame) = next { + if let Some(chunk) = frame.data_ref() { + if ttfb.is_none() { + ttfb = Some(start.elapsed()); + } + + body.extend_from_slice(chunk); + } + } else { + STORE_RESPONSE_EX.increment(); + STORE_PUT_EX.increment(); + + continue; + } + } - let latency = start.elapsed(); + let latency = start.elapsed().as_nanos() as u64; + let ttfb = ttfb.map(|v| v.as_nanos() as u64).unwrap_or(latency); STORE_REQUEST_OK.increment(); @@ -285,8 +330,8 @@ async fn task( STORE_RESPONSE_OK.increment(); STORE_DELETE_OK.increment(); - let _ = - STORE_RESPONSE_LATENCY.increment(latency.as_nanos() as _); + let _ = STORE_RESPONSE_LATENCY.increment(latency); + let _ = STORE_RESPONSE_TTFB.increment(ttfb); } 503 => { STORE_RESPONSE_RATELIMITED.increment();