diff --git a/Cargo.toml b/Cargo.toml index 9279557..21cce21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,3 +103,4 @@ nix = "0.30" parking_lot_core = "0.9.2" thiserror = "2.0.12" tower = "0.5.2" +criterion = "0.5" diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index e068dde..feb2edd 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -254,6 +254,7 @@ neli = { workspace = true, optional = true } neli-proc-macros = { workspace = true, optional = true } [dev-dependencies] +criterion = { workspace = true } reqwest = { workspace = true } serde = { workspace = true, features = ["rc"] } tempfile = { workspace = true } @@ -262,6 +263,10 @@ ipnetwork = { workspace = true } nix = { workspace = true , features = ["fs"] } tracing-subscriber = { workspace = true } +[[bench]] +name = "ratelimit" +harness = false + [build-dependencies] bindgen = { workspace = true, features = ["runtime"], optional = true } cc = { workspace = true, optional = true } diff --git a/foundations/benches/ratelimit.rs b/foundations/benches/ratelimit.rs new file mode 100644 index 0000000..0d543ae --- /dev/null +++ b/foundations/benches/ratelimit.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use criterion::{Bencher, BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +use foundations::{RateLimiter, RateLimiterConfig}; + +fn bench_uncontended(c: &mut Criterion, group_name: &str, bench_name: &str, rate: f64, burst: u64) { + let mut group = c.benchmark_group(group_name); + group.bench_function(bench_name, |b| { + let config = RateLimiterConfig::new(rate, burst); + let limiter = RateLimiter::new(&config); + b.iter(|| black_box(limiter.is_ratelimited())) + }); + group.finish(); +} + +fn bench_contended(c: &mut Criterion, group_name: &str, bench_name: &str, rate: f64, burst: u64) { + let mut group = c.benchmark_group(group_name); + let config: &'static RateLimiterConfig = + Box::leak(Box::new(RateLimiterConfig::new(rate, burst))); + + for num_threads in [2, 4, 8, 16] { + group.bench_with_input( + BenchmarkId::new(bench_name, num_threads), + &num_threads, + |b, &num_threads| { + run_contended_iter(b, config, num_threads); + }, + ); + } + + group.finish(); +} + +fn run_contended_iter(b: &mut Bencher, config: &'static RateLimiterConfig, num_threads: usize) { + let limiter = Arc::new(RateLimiter::new(config)); + + b.iter_custom(|iters| { + let barrier = Arc::new(std::sync::Barrier::new(num_threads + 1)); + + let handles: Vec<_> = (0..num_threads) + .map(|_| spawn_thread(Arc::clone(&limiter), Arc::clone(&barrier), iters)) + .collect(); + + // All threads ready, start timing + barrier.wait(); + + let start = std::time::Instant::now(); + handles.into_iter().for_each(|h| h.join().unwrap()); + start.elapsed() + }); +} + +fn spawn_thread( + limiter: Arc>, + barrier: Arc, + iters: u64, +) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + barrier.wait(); + for _ in 0..iters { + let _ = black_box(limiter.is_ratelimited()); + } + }) +} + +fn bench_uncontended_low_rate(c: &mut Criterion) { + bench_uncontended( + c, + "ratelimiter/uncontended_low_rate", + "1_per_min_1_burst", + 1.0 / 60.0, + 1, + ); +} + +fn bench_contended_low_rate(c: &mut Criterion) { + bench_contended( + c, + "ratelimiter/contended_low_rate", + "1_per_min_1_burst", + 1.0 / 60.0, + 1, + ); +} + +fn bench_uncontended_high_rate(c: &mut Criterion) { + bench_uncontended( + c, + "ratelimiter/uncontended_high_rate", + "1000rps_10_burst", + 1_000.0, + 10, + ); +} + +fn bench_contended_high_rate(c: &mut Criterion) { + bench_contended( + c, + "ratelimiter/contended_high_rate", + "1000rps_10_burst", + 1_000.0, + 10, + ); +} + +criterion_group!( + benches, + bench_uncontended_low_rate, + bench_contended_low_rate, + bench_uncontended_high_rate, + bench_contended_high_rate +); +criterion_main!(benches); diff --git a/foundations/src/lib.rs b/foundations/src/lib.rs index 8599d5d..b2f0465 100644 --- a/foundations/src/lib.rs +++ b/foundations/src/lib.rs @@ -67,8 +67,11 @@ #![warn(missing_docs)] #![cfg_attr(foundations_docsrs, feature(doc_cfg))] +mod ratelimit; mod utils; +pub use ratelimit::{RateLimiter, RateLimiterConfig}; + pub mod addr; #[cfg(feature = "cli")] diff --git a/foundations/src/ratelimit.rs b/foundations/src/ratelimit.rs new file mode 100644 index 0000000..4fddd6e --- /dev/null +++ b/foundations/src/ratelimit.rs @@ -0,0 +1,285 @@ +use std::{ + sync::{ + LazyLock, + atomic::{AtomicU64, Ordering}, + }, + time::Instant, +}; + +static EPOCH: LazyLock = LazyLock::new(Instant::now); + +/// A rate limiter using the Generic Cell Rate Algorithm (GCRA). +/// +/// GCRA is effectively a "leaky bucket" or "token bucket" algorithm that tracks +/// a theoretical arrival time (TAT) for requests. Each request advances the TAT +/// by a fixed interval. If the TAT would exceed the current time plus a +/// tolerance window, the request is rate limited. +/// +/// This approach is memory-efficient (only stores a single timestamp) and +/// provides smooth rate limiting without the burstiness that can occur with +/// fixed time windows. +pub struct RateLimiter<'a> { + /// When the next request should arrive if all requests arrive in order with + /// perfect spacing (aka TAT). + arrival_time: AtomicU64, + config: &'a RateLimiterConfig, +} + +/// Config for a [`RateLimiter`]. +pub struct RateLimiterConfig { + /// Nanoseconds between each request. + request_spacing_ns: u64, + /// How many nanoseconds from the current time we can push the arrival_time. + /// This defines how much burst we allow. This is `request_spacing_ns * + /// (burst + 1)` to allow for the base rate plus extra burst capacity. + tolerance_ns: u64, +} + +impl RateLimiterConfig { + /// Create a new [`RateLimiterConfig`] with the given rate and burst. A rate + /// of `1.0` and burst of `3` will ratelimit to 1 rps and allow up to 3 + /// requests before limiting. + pub const fn new(rate: f64, burst: u64) -> Self { + const NANOS_PER_SECOND: f64 = 1_000_000_000.0; + let request_spacing_ns = (NANOS_PER_SECOND / rate) as u64; + + Self { + request_spacing_ns, + tolerance_ns: request_spacing_ns * (burst + 1), + } + } +} + +impl<'a> RateLimiter<'a> { + /// Create a new [`RateLimiter`] with the given config. + pub const fn new(config: &'a RateLimiterConfig) -> Self { + Self { + arrival_time: AtomicU64::new(0), + config, + } + } + + /// [`RateLimiter::is_ratelimited`] returns `true` if the caller is + /// ratelimited and `false` if not. + pub fn is_ratelimited(&self) -> bool { + self.ratelimited_at(now_ns()) + } + + fn ratelimited_at(&self, now: u64) -> bool { + let result = + self.arrival_time + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |arrival_time| { + let new_arrival_time = arrival_time.max(now) + self.config.request_spacing_ns; + + if new_arrival_time > now + self.config.tolerance_ns { + // If the new arrival time is too far ahead, we don't + // update (`None`) which will cause us to be + // ratelimited. + None + } else { + // The new time is valid. Attempt to update it. + Some(new_arrival_time) + } + }); + + result.is_err() + } +} + +fn now_ns() -> u64 { + EPOCH.elapsed().as_nanos() as u64 +} + +/// Conditionally executes a block of code based on a per-callsite rate limiter. +/// +/// Each invocation of this macro creates a unique static rate limiter at the call site. +/// The code block will only execute if the rate limiter allows it (i.e., not rate limited). +/// +/// # Arguments +/// +/// * `rate` - The rate limit in requests per second (e.g., `1.0` for 1 request/second) +/// * `burst` - The burst capacity (number of requests allowed before rate limiting kicks in) +/// +/// # Example +/// +/// ``` +/// use foundations::ratelimit; +/// +/// // Allow 1 request per second with burst of 3 +/// ratelimit!(rate=1.0, burst=3, { +/// println!("This will be rate limited"); +/// }); +/// ``` +#[macro_export] +macro_rules! ratelimit { + (rate=$rate:expr, burst=$burst:expr, { $($body:tt)* }) => { + { + static __RATELIMIT_CONFIG: $crate::RateLimiterConfig = + $crate::RateLimiterConfig::new($rate, $burst); + static __RATELIMITER: $crate::RateLimiter = + $crate::RateLimiter::new(&__RATELIMIT_CONFIG); + + if !__RATELIMITER.is_ratelimited() { + $($body)* + } + } + }; +} + +#[cfg(test)] +mod tests { + use super::*; + + const NANOS_PER_SECOND: u64 = 1_000_000_000; + + /// Executes a series of rate limiter tests in order. + /// Each test is a tuple of (timestamp_ns, expected_result). + /// Panics with the index of the first failing test. + fn test_ratelimiter(limiter: RateLimiter, tests: &[(u64, bool)]) { + for (i, (time_ns, expected)) in tests.iter().enumerate() { + let result = limiter.ratelimited_at(*time_ns); + assert_eq!( + result, *expected, + "Test index {} failed at t={}ns: expected {}, got {}", + i, time_ns, expected, result + ); + } + } + + #[test] + #[should_panic(expected = "Test index 1 failed")] + fn test_ratelimiter_catches_failures() { + // Verify the test helper itself catches mismatches + let config = RateLimiterConfig::new(1.0, 0); + test_ratelimiter( + RateLimiter::new(&config), + &[ + (0, false), + // This should fail - second request at t=0 will be limited + (0, false), + ], + ); + } + + #[test] + fn zero_burst_base_rate() { + // With burst=0, only the base rate is allowed (no extra burst capacity) + let config = RateLimiterConfig::new(1.0, 0); + test_ratelimiter( + RateLimiter::new(&config), + &[ + // First request allowed + (0, false), + // Second request at same time is limited + (0, true), + // After 1 second, next request allowed + (NANOS_PER_SECOND, false), + // Immediately after, limited again + (NANOS_PER_SECOND, true), + ], + ); + } + + #[test] + fn burst_capacity_and_refill() { + // 1 request per second, extra burst of 2 + // Should allow 3 requests at t=0 (1 base + 2 burst) + let config = RateLimiterConfig::new(1.0, 2); + test_ratelimiter( + RateLimiter::new(&config), + &[ + // Use up all burst at t=0 + (0, false), + (0, false), + (0, false), + // Fourth request should be rate limited + (0, true), + // After 3 seconds, burst should be fully refilled + (3 * NANOS_PER_SECOND, false), + (3 * NANOS_PER_SECOND, false), + (3 * NANOS_PER_SECOND, false), + (3 * NANOS_PER_SECOND, true), + ], + ); + } + + #[test] + fn tokens_refill_over_time() { + // 1 request per second, extra burst of 1 + let config = RateLimiterConfig::new(1.0, 1); + test_ratelimiter( + RateLimiter::new(&config), + &[ + // Use up burst at t=0 (1 base + 1 extra = 2 total) + (0, false), + (0, false), + (0, true), + // After 1 second, we should have 1 token available + (NANOS_PER_SECOND, false), + (NANOS_PER_SECOND, true), + // After 2 seconds from start, we should have another token + (2 * NANOS_PER_SECOND, false), + ], + ); + } + + #[test] + fn high_rate_limiter() { + // 1000 requests per second, no extra burst + let spacing_ns = NANOS_PER_SECOND / 1000; // 1ms between requests + + let config = RateLimiterConfig::new(1000.0, 0); + test_ratelimiter( + RateLimiter::new(&config), + &[ + // First request allowed + (0, false), + // Immediate second request should be limited + (0, true), + // After 1ms, should be allowed + (spacing_ns, false), + ], + ); + } + + #[test] + fn steady_rate_within_limit() { + // 10 requests per second, no extra burst + let spacing_ns = NANOS_PER_SECOND / 10; // 100ms between requests + + // Requests spaced exactly at the rate limit should all succeed + let config = RateLimiterConfig::new(10.0, 0); + test_ratelimiter( + RateLimiter::new(&config), + &[ + (0 * spacing_ns, false), + (1 * spacing_ns, false), + (2 * spacing_ns, false), + (3 * spacing_ns, false), + (4 * spacing_ns, false), + (5 * spacing_ns, false), + (6 * spacing_ns, false), + (7 * spacing_ns, false), + (8 * spacing_ns, false), + (9 * spacing_ns, false), + ], + ); + } + + #[test] + fn fractional_rate() { + // 0.5 requests per second = 1 request every 2 seconds + let config = RateLimiterConfig::new(0.5, 0); + test_ratelimiter( + RateLimiter::new(&config), + &[ + // First request allowed + (0, false), + // After 1 second, still limited + (NANOS_PER_SECOND, true), + // After 2 seconds, allowed + (2 * NANOS_PER_SECOND, false), + ], + ); + } +} diff --git a/foundations/src/telemetry/log/mod.rs b/foundations/src/telemetry/log/mod.rs index 6dbee1f..7506a4b 100644 --- a/foundations/src/telemetry/log/mod.rs +++ b/foundations/src/telemetry/log/mod.rs @@ -203,10 +203,39 @@ macro_rules! __add_fields { /// ]); /// ``` /// +/// # Rate Limiting +/// +/// Log messages can be rate limited using the `ratelimit` prefix to prevent log spam +/// in hot paths. Each call site gets its own rate limiter, so the same log statement +/// in a loop will be rate limited independently from other log statements. Rate limits +/// are rather cheap (fetch_add CAS loop + 32 bytes per-callsite), so feel free to add +/// them. +/// +/// The rate limiter uses the Generic Cell Rate Algorithm (GCRA), which provides smooth +/// rate limiting without the burstiness of fixed time windows. +/// +/// ## Parameters +/// +/// - `rate`: The sustained rate in logs per second (e.g., `1.0` for 1 log/second) +/// - `burst`: how many logs can be emitted before rate limiting kicks in, resets over time. +/// +/// ## Example +/// +/// ```ignore +/// // Allow 1 log per second with a burst of 3 (will emit up to 3 logs immediately, +/// // then 1 per second thereafter) +/// log::error!(ratelimit(rate=1.0, burst=3), "Connection failed"; "addr" => addr); +/// ``` +/// /// [`LoggingSettings::redact_keys`]: crate::telemetry::settings::LoggingSettings::redact_keys #[macro_export] #[doc(hidden)] macro_rules! __error { + (ratelimit(rate=$rate:expr, burst=$burst:expr), $($args:tt)+) => { + $crate::ratelimit!(rate=$rate, burst=$burst, { + $crate::__error!($($args)+) + }); + }; ( $($args:tt)+ ) => { $crate::reexports_for_macros::slog::error!( $crate::telemetry::log::internal::current_log().read(), @@ -223,6 +252,9 @@ macro_rules! __error { /// Certain added fields may not be present in the resulting logs if /// [`LoggingSettings::redact_keys`] is used. /// +/// This macro supports `ratelimit(rate=<...>, burst=<...>)`. See the [`error!`] macro for +/// more details. +/// /// # Examples /// ``` /// use foundations::telemetry::TelemetryContext; @@ -269,6 +301,11 @@ macro_rules! __error { #[doc(hidden)] #[macro_export] macro_rules! __warn { + (ratelimit(rate=$rate:expr, burst=$burst:expr), $($args:tt)+) => { + $crate::ratelimit!(rate=$rate, burst=$burst, { + $crate::__warn!($($args)+) + }); + }; ( $($args:tt)+ ) => { $crate::reexports_for_macros::slog::warn!( $crate::telemetry::log::internal::current_log().read(), @@ -285,6 +322,9 @@ macro_rules! __warn { /// Certain added fields may not be present in the resulting logs if /// [`LoggingSettings::redact_keys`] is used. /// +/// This macro supports `ratelimit(rate=<...>, burst=<...>)`. See the [`error!`] macro for +/// more details. +/// /// # Examples /// ``` /// use foundations::telemetry::TelemetryContext; @@ -331,6 +371,11 @@ macro_rules! __warn { #[macro_export] #[doc(hidden)] macro_rules! __debug { + (ratelimit(rate=$rate:expr, burst=$burst:expr), $($args:tt)+) => { + $crate::ratelimit!(rate=$rate, burst=$burst, { + $crate::__debug!($($args)+) + }); + }; ( $($args:tt)+ ) => { $crate::reexports_for_macros::slog::debug!( $crate::telemetry::log::internal::current_log().read(), @@ -347,6 +392,9 @@ macro_rules! __debug { /// Certain added fields may not be present in the resulting logs if /// [`LoggingSettings::redact_keys`] is used. /// +/// This macro supports `ratelimit(rate=<...>, burst=<...>)`. See the [`error!`] macro for +/// more details. +/// /// # Examples /// ``` /// use foundations::telemetry::TelemetryContext; @@ -393,6 +441,11 @@ macro_rules! __debug { #[macro_export] #[doc(hidden)] macro_rules! __info { + (ratelimit(rate=$rate:expr, burst=$burst:expr), $($args:tt)+) => { + $crate::ratelimit!(rate=$rate, burst=$burst, { + $crate::__info!($($args)+) + }); + }; ( $($args:tt)+ ) => { $crate::reexports_for_macros::slog::info!( $crate::telemetry::log::internal::current_log().read(), @@ -409,6 +462,9 @@ macro_rules! __info { /// Certain added fields may not be present in the resulting logs if /// [`LoggingSettings::redact_keys`] is used. /// +/// This macro supports `ratelimit(rate=<...>, burst=<...>)`; see the [`error!`] macro for +/// more details. +/// /// # Examples /// ``` /// use foundations::telemetry::TelemetryContext; @@ -455,6 +511,11 @@ macro_rules! __info { #[macro_export] #[doc(hidden)] macro_rules! __trace { + (ratelimit(rate=$rate:expr, burst=$burst:expr), $($args:tt)+) => { + $crate::ratelimit!(rate=$rate, burst=$burst, { + $crate::__trace!($($args)+) + }); + }; ( $($args:tt)+ ) => { $crate::reexports_for_macros::slog::trace!( $crate::telemetry::log::internal::current_log().read(),