Fix race condition causing bookmark queue items to accumulate during rapid succession resumes#7225
Fix race condition causing bookmark queue items to accumulate during rapid succession resumes#7225Copilot wants to merge 4 commits intorelease/3.6.0from
Conversation
This change ensures that bookmark queue items are always deleted after processing, regardless of whether matching bookmarks were found. This prevents duplicate queue items from accumulating in the database when the same bookmark is queued multiple times in rapid succession. The distributed lock in WorkflowResumer ensures that the actual bookmark is only processed once, so it's safe to always delete the queue item. Fixes race condition where: 1. Multiple rapid requests create duplicate queue items 2. First item processes and deletes the bookmark 3. Second item finds no bookmark but was NOT deleted 4. Queue items remain stuck in database indefinitely Co-authored-by: sfmskywalker <938393+sfmskywalker@users.noreply.github.com>
Enhanced the fix to ensure queue items are always deleted even if an exception occurs during workflow resumption. The deletion is now in a finally block with its own try-catch to handle any deletion failures gracefully. This addresses the code review feedback and ensures robustness in all scenarios: - Normal processing (bookmark found or not) - Exception during workflow resumption (timeout, network error, etc.) - Exception during deletion itself (database connectivity issues) Co-authored-by: sfmskywalker <938393+sfmskywalker@users.noreply.github.com>
Enhanced the fix based on code review feedback: 1. Added error handling at ProcessPageAsync level to ensure one failing item doesn't block processing of other items in the batch 2. Added explicit exception logging in ProcessItemAsync to surface errors during workflow resumption 3. Changed deletion to use CancellationToken.None to ensure cleanup happens even during application shutdown 4. Added detailed comments explaining error handling strategy These improvements ensure the system is robust and provides visibility into any issues while still guaranteeing queue items are cleaned up properly. Co-authored-by: sfmskywalker <938393+sfmskywalker@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Addresses a race condition where rapid successive resume requests can leave duplicate bookmark queue items stuck in the queue, leading to accumulation and unprocessed workflows.
Changes:
- Wrap per-item processing in
ProcessPageAsyncwith error handling to keep batch processing moving. - Update
ProcessItemAsyncto always delete the queue item via afinallyblock and add logging for resume failures.
| foreach (var bookmarkQueueItem in page.Items) | ||
| { | ||
| try | ||
| { | ||
| await ProcessItemAsync(bookmarkQueueItem, cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| // Log the error but continue processing other items in the batch | ||
| logger.LogError(ex, "Error processing bookmark queue item {BookmarkQueueItemId}. Continuing with next item.", bookmarkQueueItem.Id); | ||
| } |
There was a problem hiding this comment.
ProcessPageAsync catches all exceptions and continues, which will also catch OperationCanceledException if cancellation is requested (e.g., during shutdown) and ProcessItemAsync starts propagating it. This can lead to noisy error logs and the processor continuing work even though cancellation was requested. Handle OperationCanceledException separately (rethrow) and/or break out of the loop when cancellationToken.IsCancellationRequested is true.
| catch (Exception ex) | ||
| { | ||
| logger.LogDebug("Successfully resumed {WorkflowCount} workflow instances using stimulus {StimulusHash} for activity type {ActivityType}", responses.Count, item.StimulusHash, item.ActivityTypeName); | ||
| await store.DeleteAsync(item.Id, cancellationToken); | ||
| logger.LogError(ex, "Error resuming workflow for bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId}. The queue item will still be deleted to prevent accumulation.", item.Id, item.WorkflowInstanceId); | ||
| } |
There was a problem hiding this comment.
The catch (Exception ex) here will also catch OperationCanceledException when cancellationToken is canceled, log it as an error, and then continue processing more items. This makes shutdown/cancellation noisy and prevents normal cancellation flow. Add an OperationCanceledException catch/when-filter that rethrows when cancellationToken.IsCancellationRequested, and only log unexpected exceptions.
| finally | ||
| { | ||
| logger.LogDebug("No matching bookmarks found for bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} for activity type {ActivityType} with stimulus {StimulusHash}", item.Id, item.WorkflowInstanceId, item.ActivityTypeName, item.StimulusHash); | ||
| // Always delete the queue item after processing, regardless of whether bookmarks were found or an exception occurred. | ||
| // This prevents duplicate queue items from accumulating when the same bookmark is queued multiple times in rapid succession. | ||
| // The distributed lock in WorkflowResumer ensures that the actual bookmark is only processed once. | ||
| // Use CancellationToken.None to ensure cleanup happens even during application shutdown. | ||
| try | ||
| { | ||
| await store.DeleteAsync(item.Id, CancellationToken.None); | ||
| } |
There was a problem hiding this comment.
Because queue items are now always deleted, the offset-based pagination in ProcessAsync can skip items: after deleting the first page, incrementing offset causes the next query to skip records in the remaining set, potentially leaving items unprocessed until the next trigger (or even until they are purged by TTL). Consider switching the loop to always read from offset 0 until no items remain, or use cursor-based paging (e.g., last CreatedAt/Id) instead of offsets when deleting during iteration.
| // Use CancellationToken.None to ensure cleanup happens even during application shutdown. | ||
| try | ||
| { | ||
| await store.DeleteAsync(item.Id, CancellationToken.None); | ||
| } |
There was a problem hiding this comment.
Using CancellationToken.None for the delete means this await can block indefinitely if the underlying store call hangs (e.g., DB connectivity issues), which is especially risky during shutdown. Prefer using a bounded token (e.g., a short CancelAfter timeout) so cleanup is attempted but can't stall the processor forever.
When multiple
/tasks/{taskid}/completerequests arrive in rapid succession, duplicate queue items are created for the same bookmark. The original implementation only deleted queue items whenresponses.Count > 0, causing the second queue item to remain stuck after the bookmark was already processed.Changes
Modified
BookmarkQueueProcessor.ProcessItemAsync()finallyblock—always executes regardless of bookmark presence or exceptionsCancellationToken.Nonefor deletion to ensure cleanup during shutdownModified
BookmarkQueueProcessor.ProcessPageAsync()Before/After
Safety
The distributed lock in
WorkflowResumer.ResumeAsync()ensures bookmarks are only processed once. Queue items are ephemeral markers—safe to delete after processing attempt. Existing 1-minute TTL purge task serves as fallback for any deletion failures.Original prompt
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.