Skip to content

Lost messages from workflow.NewChannel(ctx) #2066

@yuandrew

Description

@yuandrew

Expected Behavior

No messages get lost

Actual Behavior

Messages are getting lost. This bug was introduced in 1.35.0, from #1762

Steps to Reproduce the Problem

Setting unblockSelectorSignal to true causes the lost items

func (s *WorkflowTestSuiteUnitTest) TestChannelWorkerPattern() {
	unblockSelectorSignal = true // test passes when set to false

	assert := s.Assert()
	require := s.Require()

	// Test configuration
	numWorkers := 10
	numItems := 50
	items := make([]int, numItems)
	for i := 0; i < numItems; i++ {
		items[i] = i
	}
	// Track which items were processed
	processedItems := make(map[int]int) // maps input -> output
	// Simple workflow that mimics the channel worker pattern
	workflowFn := func(ctx Context) error {
		// Create input channel and feed items
		inputCh := NewChannel(ctx)
		Go(ctx, func(ctx Context) {
			for _, item := range items {
				inputCh.Send(ctx, item)
			}
			inputCh.Close()
		})

		// Start workers
		outputCh := NewChannel(ctx)
		wg := NewWaitGroup(ctx)
		wg.Add(numWorkers)
		// Wait group cleanup goroutine
		Go(ctx, func(ctx Context) {
			wg.Wait(ctx)
			outputCh.Close()
		})
		// Start worker goroutines
		for i := 0; i < numWorkers; i++ {
			Go(ctx, func(ctx Context) {
				defer wg.Done()
				stop := false
				selector := NewSelector(ctx)
				// Stop worker if workflow is cancelled
				selector.AddReceive(ctx.Done(), func(c ReceiveChannel, more bool) {
					stop = true
				})
				// Receive items from input channel
				selector.AddReceive(inputCh, func(ch ReceiveChannel, more bool) {
					stop = !more
					var input int
					if ch.Receive(ctx, &input) {
						output := input * 2
						outputCh.Send(ctx, output)
					}
				})
				// Worker loop
				for !stop {
					selector.Select(ctx)
				}
			})
		}
		// Collect outputs
		for i := 0; i < numItems; i++ {
			var output int
			if outputCh.Receive(ctx, &output) {
				// output = input * 2, so input = output / 2
				processedItems[output/2] = output
			}
		}
		return nil
	}
	env := s.NewTestWorkflowEnvironment()
	env.RegisterWorkflow(workflowFn)
	env.ExecuteWorkflow(workflowFn)

	require.True(env.IsWorkflowCompleted())
	require.NoError(env.GetWorkflowError())

	// Verify all items were processed exactly once
	require.Len(processedItems, numItems, "Not all items were processed")
	for i := 0; i < numItems; i++ {
		output, ok := processedItems[i]
		assert.True(ok, "Item %d was not processed", i)
		assert.Equal(i*2, output, "Item %d was not processed correctly", i)
	}
}

Specifications

  • Version: 1.35.0
  • Platform:

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions