diff --git a/src/main.rs b/src/main.rs index a965f4e8..8ffe212f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,9 @@ static WAIT: AtomicUsize = AtomicUsize::new(0); static METRICS_SNAPSHOT: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(Default::default()))); +// queue depth per client thread +static QUEUE_DEPTH: usize = 64; + fn main() { // custom panic hook to terminate whole process after unwinding std::panic::set_hook(Box::new(|s| { @@ -135,17 +138,30 @@ fn main() { }); } - let (client_sender, client_receiver) = - bounded(config.client().map(|c| c.threads() * 2).unwrap_or(1)); + let (client_sender, client_receiver) = bounded( + config + .client() + .map(|c| c.threads() * QUEUE_DEPTH) + .unwrap_or(1), + ); let (pubsub_sender, pubsub_receiver) = bounded( config .pubsub() - .map(|c| c.publisher_threads() * 2) + .map(|c| c.publisher_threads() * QUEUE_DEPTH) + .unwrap_or(1), + ); + let (store_sender, store_receiver) = bounded( + config + .storage() + .map(|c| c.threads() * QUEUE_DEPTH) + .unwrap_or(1), + ); + let (oltp_sender, oltp_receiver) = bounded( + config + .oltp() + .map(|c| c.threads() * QUEUE_DEPTH) .unwrap_or(1), ); - let (store_sender, store_receiver) = - bounded(config.storage().map(|c| c.threads() * 2).unwrap_or(1)); - let (oltp_sender, oltp_receiver) = bounded(config.oltp().map(|c| c.threads() * 2).unwrap_or(1)); output!("Protocol: {:?}", config.general().protocol()); diff --git a/src/workload/mod.rs b/src/workload/mod.rs index 0512011d..798ecee4 100644 --- a/src/workload/mod.rs +++ b/src/workload/mod.rs @@ -32,6 +32,9 @@ pub use store::StoreClientRequest; static SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0); +// a multiplier for the ratelimiter token bucket capacity +static BUCKET_CAPACITY: u64 = 64; + pub fn launch_workload( generator: Generator, config: &Config, @@ -114,7 +117,7 @@ impl Generator { Arc::new( Ratelimiter::builder(amount, interval) - .max_tokens(amount * 8) + .max_tokens(amount * BUCKET_CAPACITY) .build() .expect("failed to initialize ratelimiter"), ) @@ -993,7 +996,7 @@ pub async fn reconnect( Arc::new( Ratelimiter::builder(amount, interval) - .max_tokens(amount * 8) + .max_tokens(amount * BUCKET_CAPACITY) .build() .expect("failed to initialize ratelimiter"), )