Skip to content

Commit 0365b3b

Browse files
committed
fix(executor): prevent race condition in pause persistence
1 parent a280a53 commit 0365b3b

File tree

2 files changed

+122
-24
lines changed

2 files changed

+122
-24
lines changed

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -121,28 +121,14 @@ export class PauseResumeManager {
121121

122122
const now = new Date()
123123

124-
await db
125-
.insert(pausedExecutions)
126-
.values({
127-
id: randomUUID(),
128-
workflowId,
129-
executionId,
130-
executionSnapshot: snapshotSeed,
131-
pausePoints: pausePointsRecord,
132-
totalPauseCount: pausePoints.length,
133-
resumedCount: 0,
134-
status: 'paused',
135-
metadata: {
136-
pauseScope: 'execution',
137-
triggerIds: snapshotSeed.triggerIds,
138-
executorUserId: executorUserId ?? null,
139-
},
140-
pausedAt: now,
141-
updatedAt: now,
142-
})
143-
.onConflictDoUpdate({
144-
target: pausedExecutions.executionId,
145-
set: {
124+
// Wrap persistence in a transaction to prevent race conditions with concurrent resume requests
125+
await db.transaction(async (tx) => {
126+
await tx
127+
.insert(pausedExecutions)
128+
.values({
129+
id: randomUUID(),
130+
workflowId,
131+
executionId,
146132
executionSnapshot: snapshotSeed,
147133
pausePoints: pausePointsRecord,
148134
totalPauseCount: pausePoints.length,
@@ -153,10 +139,28 @@ export class PauseResumeManager {
153139
triggerIds: snapshotSeed.triggerIds,
154140
executorUserId: executorUserId ?? null,
155141
},
142+
pausedAt: now,
156143
updatedAt: now,
157-
},
158-
})
144+
})
145+
.onConflictDoUpdate({
146+
target: pausedExecutions.executionId,
147+
set: {
148+
executionSnapshot: snapshotSeed,
149+
pausePoints: pausePointsRecord,
150+
totalPauseCount: pausePoints.length,
151+
resumedCount: 0,
152+
status: 'paused',
153+
metadata: {
154+
pauseScope: 'execution',
155+
triggerIds: snapshotSeed.triggerIds,
156+
executorUserId: executorUserId ?? null,
157+
},
158+
updatedAt: now,
159+
},
160+
})
161+
})
159162

163+
// Process queued resumes after transaction commits to ensure visibility
160164
await PauseResumeManager.processQueuedResumes(executionId)
161165
}
162166

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* @vitest-environment node
3+
*
4+
* Tests for Issue #3081: Race Condition between pause persistence and resume requests
5+
*/
6+
import { databaseMock, loggerMock } from '@sim/testing'
7+
import { beforeEach, describe, expect, it, vi } from 'vitest'
8+
9+
vi.mock('@sim/db', () => databaseMock)
10+
vi.mock('@sim/logger', () => loggerMock)
11+
12+
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
13+
import type { PausePoint, SerializedSnapshot } from '@/executor/types'
14+
15+
describe('PauseResumeManager - Race Condition Fix (#3081)', () => {
16+
beforeEach(() => {
17+
vi.clearAllMocks()
18+
})
19+
20+
const createTestSnapshot = (): SerializedSnapshot => ({
21+
snapshot: JSON.stringify({
22+
workflow: { blocks: [], connections: [] },
23+
state: { blockStates: {}, executedBlocks: [] },
24+
}),
25+
triggerIds: [],
26+
})
27+
28+
const createTestPausePoints = (): PausePoint[] => [
29+
{
30+
contextId: 'test-context',
31+
blockId: 'pause-block-1',
32+
response: {},
33+
resumeStatus: 'paused',
34+
snapshotReady: true,
35+
registeredAt: new Date().toISOString(),
36+
},
37+
]
38+
39+
describe('persistPauseResult', () => {
40+
it.concurrent('should use database transaction for atomic persistence', async () => {
41+
const mockInsert = vi.fn().mockReturnValue({
42+
values: vi.fn().mockReturnValue({
43+
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
44+
}),
45+
})
46+
47+
const mockTransaction = vi.fn().mockImplementation(async (callback) => {
48+
const mockTx = { insert: mockInsert }
49+
return await callback(mockTx as any)
50+
})
51+
52+
vi.mocked(databaseMock.db.transaction).mockImplementation(mockTransaction)
53+
vi.spyOn(PauseResumeManager, 'processQueuedResumes').mockResolvedValue(undefined)
54+
55+
await PauseResumeManager.persistPauseResult({
56+
workflowId: 'test-workflow',
57+
executionId: 'test-execution',
58+
pausePoints: createTestPausePoints(),
59+
snapshotSeed: createTestSnapshot(),
60+
executorUserId: 'test-user',
61+
})
62+
63+
expect(mockTransaction).toHaveBeenCalledTimes(1)
64+
expect(mockInsert).toHaveBeenCalled()
65+
})
66+
67+
it.concurrent('should call processQueuedResumes after transaction', async () => {
68+
const mockInsert = vi.fn().mockReturnValue({
69+
values: vi.fn().mockReturnValue({
70+
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
71+
}),
72+
})
73+
74+
vi.mocked(databaseMock.db.transaction).mockImplementation(async (callback) => {
75+
const mockTx = { insert: mockInsert }
76+
return await callback(mockTx as any)
77+
})
78+
79+
const processQueuedResumesSpy = vi
80+
.spyOn(PauseResumeManager, 'processQueuedResumes')
81+
.mockResolvedValue(undefined)
82+
83+
await PauseResumeManager.persistPauseResult({
84+
workflowId: 'test-workflow',
85+
executionId: 'test-execution',
86+
pausePoints: createTestPausePoints(),
87+
snapshotSeed: createTestSnapshot(),
88+
executorUserId: 'test-user',
89+
})
90+
91+
expect(processQueuedResumesSpy).toHaveBeenCalledWith('test-execution')
92+
})
93+
})
94+
})

0 commit comments

Comments
 (0)