Skip to content

[Detail Bug] Removed S2 worker not awaited; can panic by sending on closed channel #148

@detail-app

Description

@detail-app

Detail Bug Report

https://app.detail.dev/org_89d327b3-b883-4365-b6a3-46b6701342a9/bugs/bug_736e2e66-41f1-4fa6-b7ed-ad4a7194853c

Summary

  • Context: The streamsManager function manages multiple worker goroutines that read from S2 streams and forward data to a shared channel.
  • Bug: When a stream is removed from the list of streams to monitor, the corresponding worker goroutine is cancelled and deleted from the tracking map, but the code does not wait for the worker to finish before continuing.
  • Actual vs. expected: The removed worker goroutine continues running and may attempt to write to the inputStream channel after it has been closed by the manager, whereas it should be fully shut down before the manager closes the channel.
  • Impact: The application will panic with "send on closed channel" when a removed worker attempts to write data after the inputStream channel is closed during shutdown.

Code with bug

In s2-bentobox/multi_stream_input.go:

for stream, worker := range existingWorkers {
	if _, found := newStreamsSet[stream]; !found {
		if config.Logger != nil {
			config.Logger.Warn("not reading from S2 source anymore", "stream", stream)
		}
		worker.Close()                          // Cancel the worker's context
		delete(existingWorkers, stream)         // Remove from map // <-- BUG 🔴 Missing worker.Wait(); goroutine may still be running
	}
}

Example

A timing-dependent race can panic:

  • A worker for "stream1" is removed. The manager calls worker.Close() and deletes it from existingWorkers but does not Wait(); the goroutine may still be running.
  • The manager later begins shutdown and closes inputStream because it believes no workers remain.
  • The orphaned worker returns from a read and attempts to forward a batch to inputStream, which is now closed, causing a panic.

The worker's send select is non-deterministic and may choose the send even if ctx.Done() is closed:

select {
case inputStream <- recvOutput{Stream: stream, Batch: batch, AckFunc: aFn}:
case <-ctx.Done():
	return
}

Recommended fix

Wait for removed workers to finish before deleting them and before any potential channel close:

for stream, worker := range existingWorkers {
	if _, found := newStreamsSet[stream]; !found {
		if config.Logger != nil {
			config.Logger.Warn("not reading from S2 source anymore", "stream", stream)
		}
		worker.Close()
		worker.Wait()  // <-- FIX 🟢 Wait for worker to finish
		delete(existingWorkers, stream)
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:bentoTickets related to the bento plugin

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions