1010#include < workerd/util/ring-buffer.h>
1111#include < workerd/util/small-set.h>
1212#include < workerd/util/state-machine.h>
13+ #include < workerd/util/weak-refs.h>
1314
1415namespace workerd ::api {
1516
@@ -165,22 +166,18 @@ class QueueImpl final {
165166 // Detach all consumers before destruction to prevent UAF.
166167 // This can happen during isolate teardown when the destruction order
167168 // of JS wrapper objects doesn't follow the ownership hierarchy.
168- auto consumers = allConsumers.snapshot ();
169- for (auto consumer: consumers) {
170- consumer->detachQueue ();
171- }
169+ allConsumers.forEach ([&](ConsumerImpl& consumer) { consumer.detachQueue (); });
172170 }
173171
174172 // Closes the queue. The close is forwarded on to all consumers.
175173 // If we are already closed or errored, do nothing here.
176174 void close (jsg::Lock& js) {
177175 if (state.isActive ()) {
178- // We copy the list of consumers in case the consumers remove themselves
179- // from the queue during the close callback, invalidating the iterator.
180- auto consumers = allConsumers.snapshot ();
181- for (auto consumer: consumers) {
182- consumer->close (js);
183- }
176+ #ifdef KJ_DEBUG
177+ isClosingOrErroring = true ;
178+ KJ_DEFER (isClosingOrErroring = false );
179+ #endif
180+ allConsumers.forEach ([&](ConsumerImpl& consumer) { consumer.close (js); });
184181 state.template transitionTo <Closed>();
185182 }
186183 }
@@ -199,12 +196,11 @@ class QueueImpl final {
199196 // If we are already closed or errored, do nothing here.
200197 void error (jsg::Lock& js, jsg::Value reason) {
201198 if (state.isActive ()) {
202- // We copy the list of consumers in case the consumers remove themselves
203- // from the queue during the error callback, invalidating the iterator.
204- auto consumers = allConsumers.snapshot ();
205- for (auto consumer: consumers) {
206- consumer->error (js, reason.addRef (js));
207- }
199+ #ifdef KJ_DEBUG
200+ isClosingOrErroring = true ;
201+ KJ_DEFER (isClosingOrErroring = false );
202+ #endif
203+ allConsumers.forEach ([&](ConsumerImpl& consumer) { consumer.error (js, reason.addRef (js)); });
208204 state.template transitionTo <Errored>(kj::mv (reason));
209205 }
210206 }
@@ -215,9 +211,9 @@ class QueueImpl final {
215211 void maybeUpdateBackpressure () {
216212 totalQueueSize = 0 ;
217213 if (state.isActive ()) {
218- for ( auto consumer: allConsumers.snapshot () ) {
219- totalQueueSize = kj::max (totalQueueSize, consumer-> size ());
220- }
214+ allConsumers.forEach ([&](ConsumerImpl& consumer ) {
215+ totalQueueSize = kj::max (totalQueueSize, consumer. size ());
216+ });
221217 }
222218 }
223219
@@ -229,15 +225,14 @@ class QueueImpl final {
229225 void push (jsg::Lock& js, kj::Rc<Entry> entry, kj::Maybe<ConsumerImpl&> skipConsumer = kj::none) {
230226 state.requireActiveUnsafe (" The queue is closed or errored." );
231227
232- for ( auto consumer: allConsumers.snapshot () ) {
228+ allConsumers.forEach ([&](ConsumerImpl& consumer ) {
233229 KJ_IF_SOME (skip, skipConsumer) {
234- if (&skip == consumer) {
235- continue ;
230+ if (&skip == & consumer) {
231+ return ;
236232 }
237233 }
238-
239- consumer->push (js, entry->clone (js));
240- }
234+ consumer.push (js, entry->clone (js));
235+ });
241236 }
242237
243238 // The current size of consumer with the most stored data.
@@ -251,8 +246,10 @@ class QueueImpl final {
251246
252247 bool wantsRead () const {
253248 if (state.isActive ()) {
254- for (auto consumer: allConsumers.snapshot ()) {
255- if (consumer->hasReadRequests ()) return true ;
249+ for (const auto & weakRef: allConsumers) {
250+ KJ_IF_SOME (consumer, weakRef->tryGet ()) {
251+ if (consumer.hasReadRequests ()) return true ;
252+ }
256253 }
257254 }
258255 return false ;
@@ -302,14 +299,31 @@ class QueueImpl final {
302299 // will be a very small number (often just one or two), so we use SmallSet to
303300 // optimize for that. This persists across state transitions so we can detach
304301 // consumers even after close()/error() transitions the queue to a terminal state.
305- SmallSet<ConsumerImpl*> allConsumers;
306-
307- void addConsumer (ConsumerImpl* consumer) {
308- allConsumers.add (consumer);
302+ //
303+ // We store weak references to consumers to safely handle the case where a consumer
304+ // is destroyed during iteration (e.g., resolving a read request triggers JS that
305+ // destroys another consumer in the same queue). When iterating, we check if the WeakRef is still valid.
306+ SmallSet<kj::Rc<WeakRef<ConsumerImpl>>> allConsumers;
307+
308+ #ifdef KJ_DEBUG
309+ // Debug flag to detect if addConsumer is called during close/error iteration.
310+ // This should never happen - it would indicate a bug in the streams implementation.
311+ bool isClosingOrErroring = false ;
312+ #endif
313+
314+ void addConsumer (kj::Rc<WeakRef<ConsumerImpl>> weakRef) {
315+ KJ_DASSERT (
316+ !isClosingOrErroring, " Cannot add a consumer while the queue is being closed or errored" );
317+ allConsumers.add (kj::mv (weakRef));
309318 }
310319
311- void removeConsumer (ConsumerImpl* consumer) {
312- allConsumers.remove (consumer);
320+ void removeConsumer (ConsumerImpl& consumer) {
321+ allConsumers.removeIf ([&consumer](const kj::Rc<WeakRef<ConsumerImpl>>& ref) {
322+ KJ_IF_SOME (c, ref->tryGet ()) {
323+ return &c == &consumer;
324+ }
325+ return false ; // Already invalid, will be cleaned up later
326+ });
313327 maybeUpdateBackpressure ();
314328 }
315329
@@ -356,7 +370,7 @@ class ConsumerImpl final {
356370 : queue(queue),
357371 state (ConsumerState::template create<Ready>()),
358372 stateListener(stateListener) {
359- queue.addConsumer (this );
373+ queue.addConsumer (selfRef. addRef () );
360374 }
361375
362376 explicit ConsumerImpl (kj::Maybe<ConsumerImpl::StateListener&> stateListener)
@@ -369,9 +383,13 @@ class ConsumerImpl final {
369383 ~ConsumerImpl () noexcept (false ) {
370384 // queue may be none if the queue was destroyed before this consumer
371385 // (e.g., during isolate teardown) or if cloned from a closed stream.
386+ // We must remove ourselves before invalidating selfRef, otherwise
387+ // removeConsumer won't find us (tryGet() would return none).
372388 KJ_IF_SOME (q, queue) {
373- q.removeConsumer (this );
389+ q.removeConsumer (* this );
374390 }
391+ // Invalidate after removal so any concurrent iteration will skip us.
392+ selfRef->invalidate ();
375393 }
376394
377395 // Called by QueueImpl destructor to detach this consumer from a queue
@@ -587,6 +605,11 @@ class ConsumerImpl final {
587605 kj::Maybe<QueueImpl&> queue;
588606 ConsumerState state;
589607 kj::Maybe<ConsumerImpl::StateListener&> stateListener;
608+ // WeakRef to this consumer, used for safe registration with QueueImpl.
609+ // When this consumer is destroyed, we invalidate the WeakRef so that
610+ // any iteration over allConsumers in QueueImpl will safely skip us.
611+ kj::Rc<WeakRef<ConsumerImpl>> selfRef =
612+ kj::rc<WeakRef<ConsumerImpl>>(kj::Badge<ConsumerImpl>{}, *this );
590613
591614 bool isClosing () {
592615 // Closing state is determined by whether there is a Close sentinel that has been
0 commit comments