|
| 1 | +/** |
| 2 | + * A counter that can be atomically incremented and decremented, and which |
| 3 | + * callers can synchronously listen for it becoming non-zero. |
| 4 | + * |
| 5 | + * This can be "open" or "closed"; once it's closed, {@link wait} will no longer |
| 6 | + * emit useful events. |
| 7 | + */ |
| 8 | +export class AtomicCounter { |
| 9 | + /** |
| 10 | + * The underlying BigInt64Array. |
| 11 | + * |
| 12 | + * The first BigInt64 represents the current value of the counter. The second |
| 13 | + * BigInt64 is used to track the closed state. |
| 14 | + */ |
| 15 | + private readonly buffer: BigInt64Array; |
| 16 | + |
| 17 | + constructor(buffer: SharedArrayBuffer) { |
| 18 | + if (buffer.byteLength !== 16) { |
| 19 | + throw new Error('SharedArrayBuffer must have a byteLength of 16.'); |
| 20 | + } |
| 21 | + this.buffer = new BigInt64Array(buffer); |
| 22 | + } |
| 23 | + |
| 24 | + /** Atomically decrement the current value by one. */ |
| 25 | + decrement(): void { |
| 26 | + Atomics.sub(this.buffer, 0, 1n); |
| 27 | + } |
| 28 | + |
| 29 | + /** Atomically increment the current value by one. */ |
| 30 | + increment(): void { |
| 31 | + if (Atomics.add(this.buffer, 0, 1n) === 0n) { |
| 32 | + Atomics.notify(this.buffer, 0, 1); |
| 33 | + } |
| 34 | + } |
| 35 | + |
| 36 | + /** |
| 37 | + * Closes the counter. |
| 38 | + * |
| 39 | + * This will cause any outstanding calls to {@link wait} on any thread to |
| 40 | + * return `true` immediately. |
| 41 | + */ |
| 42 | + close(): void { |
| 43 | + // The current value is no longer relevant once closed, therefore set it to |
| 44 | + // non-zero to prevent a potential deadlock when `Atomics.notify` is called |
| 45 | + // immediately before `Atomics.wait`. |
| 46 | + if ( |
| 47 | + Atomics.compareExchange(this.buffer, 1, 0n, 1n) === 0n && |
| 48 | + Atomics.compareExchange(this.buffer, 0, 0n, 1n) === 0n |
| 49 | + ) { |
| 50 | + Atomics.notify(this.buffer, 0); |
| 51 | + } |
| 52 | + } |
| 53 | + |
| 54 | + /** |
| 55 | + * Waits until the current value is not zero or the counter is closed. |
| 56 | + * |
| 57 | + * Returns `true` when the counter is non-zero *or* when it's closed. Returns |
| 58 | + * `false` if the counter remains zero for `timeout` milliseconds. |
| 59 | + */ |
| 60 | + wait(timeout?: number): boolean { |
| 61 | + while ( |
| 62 | + Atomics.load(this.buffer, 0) === 0n && |
| 63 | + Atomics.load(this.buffer, 1) === 0n |
| 64 | + ) { |
| 65 | + const result = Atomics.wait(this.buffer, 0, 0n, timeout); |
| 66 | + if (result !== 'ok') { |
| 67 | + return result !== 'timed-out'; |
| 68 | + } |
| 69 | + } |
| 70 | + return true; |
| 71 | + } |
| 72 | +} |
0 commit comments