diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bd8fc6..a8cb4cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,15 +15,27 @@ jobs: strategy: # https://hexdocs.pm/elixir/compatibility-and-deprecations.html#between-elixir-and-erlang-otp matrix: - elixir: [1.17, 1.16, 1.15] - otp: [26, 25] include: + # Latest + - elixir: 1.19 + otp: 28 + - elixir: 1.19 + otp: 27 + # 1.18 - elixir: 1.18 otp: 27 + - elixir: 1.18 + otp: 26 + # 1.17 - elixir: 1.17 otp: 27 - - elixir: 1.14 - otp: 25 + - elixir: 1.17 + otp: 26 + # Older supported + - elixir: 1.16 + otp: 26 + - elixir: 1.15 + otp: 26 steps: - uses: actions/checkout@v5 @@ -56,6 +68,6 @@ jobs: - uses: actions/checkout@v5 - uses: erlef/setup-beam@v1 with: - elixir-version: 1 - otp-version: 27 + elixir-version: 1.19 + otp-version: 28 - run: mix format --check-formatted diff --git a/.tool-versions b/.tool-versions index ef426e5..3d664e3 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.18.2-otp-27 -erlang 27.1.2 +elixir 1.19.5-otp-28 +erlang 28.3.1 diff --git a/README.md b/README.md index 9ad9d8b..7e74dd7 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,6 @@ **Hammer** is a rate-limiter for Elixir with pluggable storage backends. Hammer enables users to set limits on actions performed within specified time intervals, applying per-user or global limits on API requests, file uploads, and more. ---- - -> [!NOTE] -> -> This README is for the unreleased master branch, please reference the [official documentation on hexdocs](https://hexdocs.pm/hammer) for the latest stable release. - ---- - ## Installation Hammer is [available in Hex](https://hex.pm/packages/hammer). Install by adding `:hammer` to your list of dependencies in `mix.exs`: diff --git a/lib/hammer/atomic.ex b/lib/hammer/atomic.ex index 4f472bb..0c67385 100644 --- a/lib/hammer/atomic.ex +++ b/lib/hammer/atomic.ex @@ -81,7 +81,7 @@ defmodule Hammer.Atomic do def start_link(opts) do opts = Keyword.put(opts, :table, @table) opts = Keyword.put_new(opts, :clean_period, :timer.minutes(1)) - opts = Keyword.put_new(opts, :algorithm, @algorithm) + opts = Keyword.put_new(opts, :algorithm_module, @algorithm) Hammer.Atomic.start_link(opts) end @@ -136,7 +136,7 @@ defmodule Hammer.Atomic do {clean_period, opts} = Keyword.pop!(opts, :clean_period) {table, opts} = Keyword.pop!(opts, :table) - {algorithm, opts} = Keyword.pop!(opts, :algorithm) + {algorithm_module, opts} = Keyword.pop!(opts, :algorithm_module) {key_older_than, opts} = Keyword.pop(opts, :key_older_than, :timer.hours(24)) case opts do @@ -151,10 +151,10 @@ defmodule Hammer.Atomic do config = %{ table: table, - table_opts: algorithm.ets_opts(), + table_opts: algorithm_module.ets_opts(), clean_period: clean_period, key_older_than: key_older_than, - algorithm: algorithm + algorithm_module: algorithm_module } GenServer.start_link(__MODULE__, config, gen_opts) diff --git a/lib/hammer/atomic/leaky_bucket.ex b/lib/hammer/atomic/leaky_bucket.ex index 6c8a9a6..d4fedeb 100644 --- a/lib/hammer/atomic/leaky_bucket.ex +++ b/lib/hammer/atomic/leaky_bucket.ex @@ -86,6 +86,8 @@ defmodule Hammer.Atomic.LeakyBucket do MyApp.RateLimit.hit("user_123", 100, 500, 1) """ + import Bitwise + @doc false @spec ets_opts() :: list() def ets_opts do @@ -99,6 +101,19 @@ defmodule Hammer.Atomic.LeakyBucket do ] end + # Pack timestamp (seconds) and fill level into one 64-bit value for atomic CAS. + # High 32 bits: timestamp, Low 32 bits: fill level + @compile {:inline, pack: 2, unpack: 1} + defp pack(timestamp, fill) do + (timestamp &&& 0xFFFFFFFF) <<< 32 ||| (fill &&& 0xFFFFFFFF) + end + + defp unpack(packed) do + timestamp = packed >>> 32 &&& 0xFFFFFFFF + fill = packed &&& 0xFFFFFFFF + {timestamp, fill} + end + @doc """ Checks if a key is allowed to perform an action, and increment the counter by the given amount. """ @@ -110,34 +125,51 @@ defmodule Hammer.Atomic.LeakyBucket do cost :: pos_integer() ) :: {:allow, non_neg_integer()} | {:deny, non_neg_integer()} def hit(table, key, leak_rate, capacity, cost) do - # bucket window now = System.system_time(:second) case :ets.lookup(table, key) do [{_, atomic}] -> - # Get current bucket state - current_fill = :atomics.get(atomic, 1) - last_update = :atomics.get(atomic, 2) + do_hit(atomic, now, leak_rate, capacity, cost) - leaked = trunc((now - last_update) * leak_rate) + [] -> + atomic = :atomics.new(2, signed: false) - # Subtract leakage from current level (don't go below 0) - current_fill = max(0, current_fill - leaked) + if :ets.insert_new(table, {key, atomic}) do + # Initialize with empty bucket at current time + initial_packed = pack(now, 0) + :atomics.put(atomic, 1, initial_packed) + :atomics.put(atomic, 2, now) + end - if current_fill < capacity do - final_level = current_fill + cost + hit(table, key, leak_rate, capacity, cost) + end + end + + defp do_hit(atomic, now, leak_rate, capacity, cost) do + current_packed = :atomics.get(atomic, 1) + {last_update, current_fill} = unpack(current_packed) + + leaked = trunc((now - last_update) * leak_rate) + + # Subtract leakage from current level (don't go below 0) + current_fill = max(0, current_fill - leaked) - :atomics.exchange(atomic, 1, final_level) - :atomics.exchange(atomic, 2, now) + if current_fill < capacity do + final_level = current_fill + cost + new_packed = pack(now, final_level) + case :atomics.compare_exchange(atomic, 1, current_packed, new_packed) do + :ok -> + # Update slot 2 for cleanup tracking (non-critical, just for age-based cleanup) + :atomics.put(atomic, 2, now) {:allow, final_level} - else - {:deny, 1000} - end - [] -> - :ets.insert_new(table, {key, :atomics.new(2, signed: false)}) - hit(table, key, leak_rate, capacity, cost) + _current_value -> + # CAS failed, another process modified the value; retry + do_hit(atomic, now, leak_rate, capacity, cost) + end + else + {:deny, 1000} end end @@ -151,7 +183,9 @@ defmodule Hammer.Atomic.LeakyBucket do 0 [{_, atomic}] -> - :atomics.get(atomic, 1) + packed = :atomics.get(atomic, 1) + {_timestamp, fill} = unpack(packed) + fill _ -> 0 diff --git a/lib/hammer/atomic/token_bucket.ex b/lib/hammer/atomic/token_bucket.ex index 9437017..eff8335 100644 --- a/lib/hammer/atomic/token_bucket.ex +++ b/lib/hammer/atomic/token_bucket.ex @@ -90,6 +90,8 @@ defmodule Hammer.Atomic.TokenBucket do MyApp.RateLimit.hit("user_123", 10, 100, 1) """ + import Bitwise + @doc false @spec ets_opts() :: list() def ets_opts do @@ -103,6 +105,19 @@ defmodule Hammer.Atomic.TokenBucket do ] end + # Pack timestamp (seconds) and fill level into one 64-bit value for atomic CAS. + # High 32 bits: timestamp, Low 32 bits: fill level + @compile {:inline, pack: 2, unpack: 1} + defp pack(timestamp, fill) do + (timestamp &&& 0xFFFFFFFF) <<< 32 ||| (fill &&& 0xFFFFFFFF) + end + + defp unpack(packed) do + timestamp = packed >>> 32 &&& 0xFFFFFFFF + fill = packed &&& 0xFFFFFFFF + {timestamp, fill} + end + @doc """ Checks if a key is allowed to perform an action, and consume the bucket by the given amount. """ @@ -114,41 +129,51 @@ defmodule Hammer.Atomic.TokenBucket do cost :: pos_integer() ) :: {:allow, non_neg_integer()} | {:deny, non_neg_integer()} def hit(table, key, refill_rate, capacity, cost \\ 1) do - # bucket window now = System.system_time(:second) case :ets.lookup(table, key) do [{_, atomic}] -> - # Get current bucket state - current_fill = :atomics.get(atomic, 1) - last_update = :atomics.get(atomic, 2) - - new_tokens = trunc((now - last_update) * refill_rate) - - current_tokens = min(capacity, current_fill + new_tokens) - - if current_tokens >= cost do - final_level = current_tokens - cost - - :atomics.exchange(atomic, 1, final_level) - :atomics.exchange(atomic, 2, now) - - {:allow, final_level} - else - {:deny, 1000} - end + do_hit(atomic, now, refill_rate, capacity, cost) [] -> atomic = :atomics.new(2, signed: false) if :ets.insert_new(table, {key, atomic}) do - :atomics.exchange(atomic, 1, capacity) + initial_packed = pack(now, capacity) + :atomics.put(atomic, 1, initial_packed) + :atomics.put(atomic, 2, now) end hit(table, key, refill_rate, capacity, cost) end end + defp do_hit(atomic, now, refill_rate, capacity, cost) do + current_packed = :atomics.get(atomic, 1) + {last_update, current_fill} = unpack(current_packed) + + new_tokens = trunc((now - last_update) * refill_rate) + current_tokens = min(capacity, current_fill + new_tokens) + + if current_tokens >= cost do + final_level = current_tokens - cost + new_packed = pack(now, final_level) + + case :atomics.compare_exchange(atomic, 1, current_packed, new_packed) do + :ok -> + # Update slot 2 for cleanup tracking (non-critical, just for age-based cleanup) + :atomics.put(atomic, 2, now) + {:allow, final_level} + + _current_value -> + # CAS failed, another process modified the value; retry + do_hit(atomic, now, refill_rate, capacity, cost) + end + else + {:deny, 1000} + end + end + @doc """ Returns the current level of the bucket for a given key. """ @@ -159,7 +184,9 @@ defmodule Hammer.Atomic.TokenBucket do 0 [{_, atomic}] -> - :atomics.get(atomic, 1) + packed = :atomics.get(atomic, 1) + {_timestamp, fill} = unpack(packed) + fill _ -> 0 diff --git a/lib/hammer/ets.ex b/lib/hammer/ets.ex index 0098056..63935e1 100644 --- a/lib/hammer/ets.ex +++ b/lib/hammer/ets.ex @@ -144,8 +144,6 @@ defmodule Hammer.ETS do - `:clean_period` - How often to run the cleanup process (in milliseconds). Defaults to 1 minute. - `:key_older_than` - Optional maximum age for bucket entries (in milliseconds). Defaults to 24 hours. Entries older than this will be removed during cleanup. - - `:algorithm` - The rate limiting algorithm to use. Can be `:fixed_window`, `:sliding_window`, - `:token_bucket`, or `:leaky_bucket`. Defaults to `:fixed_window`. - optional `:debug`, `:spawn_opts`, and `:hibernate_after` GenServer options """ @spec start_link([start_option]) :: GenServer.on_start()