-
Notifications
You must be signed in to change notification settings - Fork 2
Open
Labels
area:bentoTickets related to the bento pluginTickets related to the bento plugin
Description
Detail Bug Report
Summary
- Context: The
streamsManagerfunction manages multiple worker goroutines that read from S2 streams and forward data to a shared channel. - Bug: When a stream is removed from the list of streams to monitor, the corresponding worker goroutine is cancelled and deleted from the tracking map, but the code does not wait for the worker to finish before continuing.
- Actual vs. expected: The removed worker goroutine continues running and may attempt to write to the
inputStreamchannel after it has been closed by the manager, whereas it should be fully shut down before the manager closes the channel. - Impact: The application will panic with "send on closed channel" when a removed worker attempts to write data after the
inputStreamchannel is closed during shutdown.
Code with bug
In s2-bentobox/multi_stream_input.go:
for stream, worker := range existingWorkers {
if _, found := newStreamsSet[stream]; !found {
if config.Logger != nil {
config.Logger.Warn("not reading from S2 source anymore", "stream", stream)
}
worker.Close() // Cancel the worker's context
delete(existingWorkers, stream) // Remove from map // <-- BUG 🔴 Missing worker.Wait(); goroutine may still be running
}
}Example
A timing-dependent race can panic:
- A worker for "stream1" is removed. The manager calls
worker.Close()and deletes it fromexistingWorkersbut does notWait(); the goroutine may still be running. - The manager later begins shutdown and closes
inputStreambecause it believes no workers remain. - The orphaned worker returns from a read and attempts to forward a batch to
inputStream, which is now closed, causing a panic.
The worker's send select is non-deterministic and may choose the send even if ctx.Done() is closed:
select {
case inputStream <- recvOutput{Stream: stream, Batch: batch, AckFunc: aFn}:
case <-ctx.Done():
return
}Recommended fix
Wait for removed workers to finish before deleting them and before any potential channel close:
for stream, worker := range existingWorkers {
if _, found := newStreamsSet[stream]; !found {
if config.Logger != nil {
config.Logger.Warn("not reading from S2 source anymore", "stream", stream)
}
worker.Close()
worker.Wait() // <-- FIX 🟢 Wait for worker to finish
delete(existingWorkers, stream)
}
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:bentoTickets related to the bento pluginTickets related to the bento plugin