Skip to content

Commit 1b0d832

Browse files
committed
fix(rivetkit): stall stop handler until start completes
1 parent b7f29b8 commit 1b0d832

File tree

6 files changed

+272
-6
lines changed

6 files changed

+272
-6
lines changed

frontend/packages/example-registry/src/_gen.ts

Lines changed: 12 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import {
6868
workflowQueueActor,
6969
workflowSleepActor,
7070
} from "./workflow";
71+
import { startStopRaceActor, lifecycleObserver } from "./start-stop-race";
7172

7273
// Consolidated setup with all actors
7374
export const registry = setup({
@@ -160,5 +161,8 @@ export const registry = setup({
160161
dbActorDrizzle,
161162
// From stateless.ts
162163
statelessActor,
164+
// From start-stop-race.ts
165+
startStopRaceActor,
166+
lifecycleObserver,
163167
},
164168
});
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { actor } from "rivetkit";
2+
3+
/**
4+
* Actor designed to test start/stop race conditions.
5+
* Has a slow initialization to make race conditions easier to trigger.
6+
*/
7+
export const startStopRaceActor = actor({
8+
state: {
9+
initialized: false,
10+
startTime: 0,
11+
destroyCalled: false,
12+
startCompleted: false,
13+
},
14+
onWake: async (c) => {
15+
c.state.startTime = Date.now();
16+
17+
// Simulate slow initialization to create window for race condition
18+
await new Promise((resolve) => setTimeout(resolve, 100));
19+
20+
c.state.initialized = true;
21+
c.state.startCompleted = true;
22+
},
23+
onDestroy: (c) => {
24+
c.state.destroyCalled = true;
25+
// Don't save state here - the actor framework will save it automatically
26+
},
27+
actions: {
28+
getState: (c) => {
29+
return {
30+
initialized: c.state.initialized,
31+
startTime: c.state.startTime,
32+
destroyCalled: c.state.destroyCalled,
33+
startCompleted: c.state.startCompleted,
34+
};
35+
},
36+
ping: (c) => {
37+
return "pong";
38+
},
39+
destroy: (c) => {
40+
c.destroy();
41+
},
42+
},
43+
});
44+
45+
/**
46+
* Observer actor to track lifecycle events from other actors
47+
*/
48+
export const lifecycleObserver = actor({
49+
state: {
50+
events: [] as Array<{
51+
actorKey: string;
52+
event: string;
53+
timestamp: number;
54+
}>,
55+
},
56+
actions: {
57+
recordEvent: (c, params: { actorKey: string; event: string }) => {
58+
c.state.events.push({
59+
actorKey: params.actorKey,
60+
event: params.event,
61+
timestamp: Date.now(),
62+
});
63+
},
64+
getEvents: (c) => {
65+
return c.state.events;
66+
},
67+
clearEvents: (c) => {
68+
c.state.events = [];
69+
},
70+
},
71+
});

rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe } from "vitest";
22
import type { DriverTestConfig } from "../mod";
3+
import { runActorLifecycleTests } from "./actor-lifecycle";
34
import { runActorScheduleTests } from "./actor-schedule";
45
import { runActorSleepTests } from "./actor-sleep";
56
import { runActorStateTests } from "./actor-state";
@@ -14,5 +15,8 @@ export function runActorDriverTests(driverTestConfig: DriverTestConfig) {
1415

1516
// Run actor sleep tests
1617
runActorSleepTests(driverTestConfig);
18+
19+
// Run actor lifecycle tests
20+
runActorLifecycleTests(driverTestConfig);
1721
});
1822
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import { describe, expect, test } from "vitest";
2+
import type { DriverTestConfig } from "../mod";
3+
import { setupDriverTest } from "../utils";
4+
5+
export function runActorLifecycleTests(driverTestConfig: DriverTestConfig) {
6+
describe("Actor Lifecycle Tests", () => {
7+
test("actor stop during start waits for start to complete", async (c) => {
8+
const { client } = await setupDriverTest(c, driverTestConfig);
9+
10+
const actorKey = `test-stop-during-start-${Date.now()}`;
11+
12+
// Create actor - this starts the actor
13+
const actor = client.startStopRaceActor.getOrCreate([actorKey]);
14+
15+
// Immediately try to call an action and then destroy
16+
// This creates a race where the actor might not be fully started yet
17+
const pingPromise = actor.ping();
18+
19+
// Get actor ID
20+
const actorId = await actor.resolve();
21+
22+
// Destroy immediately while start might still be in progress
23+
await actor.destroy();
24+
25+
// The ping should still complete successfully because destroy waits for start
26+
const result = await pingPromise;
27+
expect(result).toBe("pong");
28+
29+
// Verify actor was actually destroyed
30+
let destroyed = false;
31+
try {
32+
await client.startStopRaceActor.getForId(actorId).ping();
33+
} catch (err: any) {
34+
destroyed = true;
35+
expect(err.group).toBe("actor");
36+
expect(err.code).toBe("not_found");
37+
}
38+
expect(destroyed).toBe(true);
39+
});
40+
41+
test("actor stop before actor instantiation completes cleans up handler", async (c) => {
42+
const { client } = await setupDriverTest(c, driverTestConfig);
43+
44+
const actorKey = `test-stop-before-instantiation-${Date.now()}`;
45+
46+
// Create multiple actors rapidly to increase chance of race
47+
const actors = Array.from({ length: 5 }, (_, i) =>
48+
client.startStopRaceActor.getOrCreate([
49+
`${actorKey}-${i}`,
50+
]),
51+
);
52+
53+
// Resolve all actor IDs (this triggers start)
54+
const ids = await Promise.all(actors.map((a) => a.resolve()));
55+
56+
// Immediately destroy all actors
57+
await Promise.all(actors.map((a) => a.destroy()));
58+
59+
// Verify all actors were cleaned up
60+
for (const id of ids) {
61+
let destroyed = false;
62+
try {
63+
await client.startStopRaceActor.getForId(id).ping();
64+
} catch (err: any) {
65+
destroyed = true;
66+
expect(err.group).toBe("actor");
67+
expect(err.code).toBe("not_found");
68+
}
69+
expect(destroyed, `actor ${id} should be destroyed`).toBe(
70+
true,
71+
);
72+
}
73+
});
74+
75+
test("onBeforeActorStart completes before stop proceeds", async (c) => {
76+
const { client } = await setupDriverTest(c, driverTestConfig);
77+
78+
const actorKey = `test-before-actor-start-${Date.now()}`;
79+
80+
// Create actor
81+
const actor = client.startStopRaceActor.getOrCreate([actorKey]);
82+
83+
// Call action to ensure actor is starting
84+
const statePromise = actor.getState();
85+
86+
// Destroy immediately
87+
await actor.destroy();
88+
89+
// State should be initialized because onBeforeActorStart must complete
90+
const state = await statePromise;
91+
expect(state.initialized).toBe(true);
92+
expect(state.startCompleted).toBe(true);
93+
});
94+
95+
test("multiple rapid create/destroy cycles handle race correctly", async (c) => {
96+
const { client } = await setupDriverTest(c, driverTestConfig);
97+
98+
// Perform multiple rapid create/destroy cycles
99+
for (let i = 0; i < 10; i++) {
100+
const actorKey = `test-rapid-cycle-${Date.now()}-${i}`;
101+
const actor = client.startStopRaceActor.getOrCreate([
102+
actorKey,
103+
]);
104+
105+
// Trigger start
106+
const resolvePromise = actor.resolve();
107+
108+
// Immediately destroy
109+
const destroyPromise = actor.destroy();
110+
111+
// Both should complete without errors
112+
await Promise.all([resolvePromise, destroyPromise]);
113+
}
114+
115+
// If we get here without errors, the race condition is handled correctly
116+
expect(true).toBe(true);
117+
});
118+
119+
test("actor stop called with no actor instance cleans up handler", async (c) => {
120+
const { client } = await setupDriverTest(c, driverTestConfig);
121+
122+
const actorKey = `test-cleanup-no-instance-${Date.now()}`;
123+
124+
// Create and immediately destroy
125+
const actor = client.startStopRaceActor.getOrCreate([actorKey]);
126+
const id = await actor.resolve();
127+
await actor.destroy();
128+
129+
// Try to recreate with same key - should work without issues
130+
const newActor = client.startStopRaceActor.getOrCreate([
131+
actorKey,
132+
]);
133+
const result = await newActor.ping();
134+
expect(result).toBe("pong");
135+
136+
// Clean up
137+
await newActor.destroy();
138+
});
139+
140+
test("onDestroy is called even when actor is destroyed during start", async (c) => {
141+
const { client } = await setupDriverTest(c, driverTestConfig);
142+
143+
const actorKey = `test-ondestroy-during-start-${Date.now()}`;
144+
145+
// Create actor
146+
const actor = client.startStopRaceActor.getOrCreate([actorKey]);
147+
148+
// Start and immediately destroy
149+
const statePromise = actor.getState();
150+
await actor.destroy();
151+
152+
// Verify onDestroy was called (requires actor to be started)
153+
const state = await statePromise;
154+
expect(state.destroyCalled).toBe(true);
155+
});
156+
});
157+
}

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ export class EngineActorDriver implements ActorDriver {
153153
onConnected: () => {
154154
this.#runnerStarted.resolve(undefined);
155155
},
156-
onDisconnected: (_code, _reason) => {},
156+
onDisconnected: (_code, _reason) => { },
157157
onShutdown: () => {
158158
this.#runnerStopped.resolve(undefined);
159159
this.#isRunnerStopped = true;
@@ -356,7 +356,7 @@ export class EngineActorDriver implements ActorDriver {
356356
async serverlessHandleStart(c: HonoContext): Promise<Response> {
357357
return streamSSE(c, async (stream) => {
358358
// NOTE: onAbort does not work reliably
359-
stream.onAbort(() => {});
359+
stream.onAbort(() => { });
360360
c.req.raw.signal.addEventListener("abort", () => {
361361
logger().debug("SSE aborted, shutting down runner");
362362

@@ -498,7 +498,26 @@ export class EngineActorDriver implements ActorDriver {
498498
this.#actorStopIntent.delete(actorId);
499499

500500
const handler = this.#actors.get(actorId);
501-
if (handler?.actor) {
501+
if (!handler) {
502+
logger().debug({ msg: "no runner actor handler to stop", actorId, reason });
503+
return;
504+
}
505+
506+
if (handler.actorStartPromise) {
507+
try {
508+
logger().debug({ msg: "runner actor stopping before it started, waiting", actorId, generation });
509+
await handler.actorStartPromise.promise;
510+
} catch (err) {
511+
// Start failed, but we still want to clean up the handler
512+
logger().debug({
513+
msg: "actor start failed during stop, cleaning up handler",
514+
actorId,
515+
err: stringifyError(err),
516+
});
517+
}
518+
}
519+
520+
if (handler.actor) {
502521
try {
503522
await handler.actor.onStop(reason);
504523
} catch (err) {
@@ -507,9 +526,10 @@ export class EngineActorDriver implements ActorDriver {
507526
err: stringifyError(err),
508527
});
509528
}
510-
this.#actors.delete(actorId);
511529
}
512530

531+
this.#actors.delete(actorId);
532+
513533
logger().debug({ msg: "runner actor stopped", actorId, reason });
514534
}
515535

0 commit comments

Comments
 (0)