-
Notifications
You must be signed in to change notification settings - Fork 286
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Expected Behavior
No messages get lost
Actual Behavior
Messages are getting lost. This bug was introduced in 1.35.0, from #1762
Steps to Reproduce the Problem
Setting unblockSelectorSignal to true causes the lost items
func (s *WorkflowTestSuiteUnitTest) TestChannelWorkerPattern() {
unblockSelectorSignal = true // test passes when set to false
assert := s.Assert()
require := s.Require()
// Test configuration
numWorkers := 10
numItems := 50
items := make([]int, numItems)
for i := 0; i < numItems; i++ {
items[i] = i
}
// Track which items were processed
processedItems := make(map[int]int) // maps input -> output
// Simple workflow that mimics the channel worker pattern
workflowFn := func(ctx Context) error {
// Create input channel and feed items
inputCh := NewChannel(ctx)
Go(ctx, func(ctx Context) {
for _, item := range items {
inputCh.Send(ctx, item)
}
inputCh.Close()
})
// Start workers
outputCh := NewChannel(ctx)
wg := NewWaitGroup(ctx)
wg.Add(numWorkers)
// Wait group cleanup goroutine
Go(ctx, func(ctx Context) {
wg.Wait(ctx)
outputCh.Close()
})
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
Go(ctx, func(ctx Context) {
defer wg.Done()
stop := false
selector := NewSelector(ctx)
// Stop worker if workflow is cancelled
selector.AddReceive(ctx.Done(), func(c ReceiveChannel, more bool) {
stop = true
})
// Receive items from input channel
selector.AddReceive(inputCh, func(ch ReceiveChannel, more bool) {
stop = !more
var input int
if ch.Receive(ctx, &input) {
output := input * 2
outputCh.Send(ctx, output)
}
})
// Worker loop
for !stop {
selector.Select(ctx)
}
})
}
// Collect outputs
for i := 0; i < numItems; i++ {
var output int
if outputCh.Receive(ctx, &output) {
// output = input * 2, so input = output / 2
processedItems[output/2] = output
}
}
return nil
}
env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(workflowFn)
env.ExecuteWorkflow(workflowFn)
require.True(env.IsWorkflowCompleted())
require.NoError(env.GetWorkflowError())
// Verify all items were processed exactly once
require.Len(processedItems, numItems, "Not all items were processed")
for i := 0; i < numItems; i++ {
output, ok := processedItems[i]
assert.True(ok, "Item %d was not processed", i)
assert.Equal(i*2, output, "Item %d was not processed correctly", i)
}
}
Specifications
- Version: 1.35.0
- Platform:
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working