Skip to content

Commit 9524546

Browse files
committed
feat(rivetkit): integrate workflows in to actors
1 parent 8a67460 commit 9524546

File tree

31 files changed

+1926
-147
lines changed

31 files changed

+1926
-147
lines changed

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ import { rawHttpRequestPropertiesActor } from "./raw-http-request-properties";
3737
import { rawWebSocketActor, rawWebSocketBinaryActor } from "./raw-websocket";
3838
import { requestAccessActor } from "./request-access";
3939
import { rejectConnectionActor } from "./reject-connection";
40+
import {
41+
runWithError,
42+
runWithEarlyExit,
43+
runWithoutHandler,
44+
runWithQueueConsumer,
45+
runWithTicks,
46+
} from "./run";
4047
import { scheduled } from "./scheduled";
4148
import {
4249
sleep,
@@ -52,6 +59,11 @@ import {
5259
staticVarActor,
5360
uniqueVarActor,
5461
} from "./vars";
62+
import {
63+
workflowCounterActor,
64+
workflowQueueActor,
65+
workflowSleepActor,
66+
} from "./workflow";
5567

5668
// Consolidated setup with all actors
5769
export const registry = setup({
@@ -127,5 +139,15 @@ export const registry = setup({
127139
// From large-payloads.ts
128140
largePayloadActor,
129141
largePayloadConnActor,
142+
// From run.ts
143+
runWithTicks,
144+
runWithQueueConsumer,
145+
runWithEarlyExit,
146+
runWithError,
147+
runWithoutHandler,
148+
// From workflow.ts
149+
workflowCounterActor,
150+
workflowQueueActor,
151+
workflowSleepActor,
130152
},
131153
});
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { actor } from "rivetkit";
2+
import type { registry } from "./registry";
3+
4+
export const RUN_SLEEP_TIMEOUT = 500;
5+
6+
// Actor that tracks tick counts and respects abort signal
7+
export const runWithTicks = actor({
8+
state: {
9+
tickCount: 0,
10+
lastTickAt: 0,
11+
runStarted: false,
12+
runExited: false,
13+
},
14+
run: async (c) => {
15+
c.state.runStarted = true;
16+
c.log.info("run handler started");
17+
18+
while (!c.abortSignal.aborted) {
19+
c.state.tickCount += 1;
20+
c.state.lastTickAt = Date.now();
21+
c.log.info({ msg: "tick", tickCount: c.state.tickCount });
22+
23+
// Wait 50ms between ticks, or exit early if aborted
24+
await new Promise<void>((resolve) => {
25+
const timeout = setTimeout(resolve, 50);
26+
c.abortSignal.addEventListener(
27+
"abort",
28+
() => {
29+
clearTimeout(timeout);
30+
resolve();
31+
},
32+
{ once: true },
33+
);
34+
});
35+
}
36+
37+
c.state.runExited = true;
38+
c.log.info("run handler exiting gracefully");
39+
},
40+
actions: {
41+
getState: (c) => ({
42+
tickCount: c.state.tickCount,
43+
lastTickAt: c.state.lastTickAt,
44+
runStarted: c.state.runStarted,
45+
runExited: c.state.runExited,
46+
}),
47+
},
48+
options: {
49+
sleepTimeout: RUN_SLEEP_TIMEOUT,
50+
runStopTimeout: 1000,
51+
},
52+
});
53+
54+
// Actor that consumes from a queue in the run handler
55+
export const runWithQueueConsumer = actor({
56+
state: {
57+
messagesReceived: [] as Array<{ name: string; body: unknown }>,
58+
runStarted: false,
59+
},
60+
run: async (c) => {
61+
c.state.runStarted = true;
62+
c.log.info("run handler started, waiting for messages");
63+
64+
while (!c.abortSignal.aborted) {
65+
const message = await c.queue.next("messages", { timeout: 100 });
66+
if (message) {
67+
c.log.info({ msg: "received message", body: message.body });
68+
c.state.messagesReceived.push({
69+
name: message.name,
70+
body: message.body,
71+
});
72+
}
73+
}
74+
75+
c.log.info("run handler exiting gracefully");
76+
},
77+
actions: {
78+
getState: (c) => ({
79+
messagesReceived: c.state.messagesReceived,
80+
runStarted: c.state.runStarted,
81+
}),
82+
sendMessage: async (c, body: unknown) => {
83+
const client = c.client<typeof registry>();
84+
const handle = client.runWithQueueConsumer.getForId(c.actorId);
85+
await handle.queue.messages.send(body);
86+
return true;
87+
},
88+
},
89+
options: {
90+
sleepTimeout: RUN_SLEEP_TIMEOUT,
91+
runStopTimeout: 1000,
92+
},
93+
});
94+
95+
// Actor that exits the run handler after a short delay to test crash behavior
96+
export const runWithEarlyExit = actor({
97+
state: {
98+
runStarted: false,
99+
destroyCalled: false,
100+
},
101+
run: async (c) => {
102+
c.state.runStarted = true;
103+
c.log.info("run handler started, will exit after delay");
104+
// Wait a bit so we can observe the runStarted state before exit
105+
await new Promise((resolve) => setTimeout(resolve, 200));
106+
c.log.info("run handler exiting early");
107+
// Exit without respecting abort signal
108+
},
109+
onDestroy: (c) => {
110+
c.state.destroyCalled = true;
111+
},
112+
actions: {
113+
getState: (c) => ({
114+
runStarted: c.state.runStarted,
115+
destroyCalled: c.state.destroyCalled,
116+
}),
117+
},
118+
options: {
119+
sleepTimeout: RUN_SLEEP_TIMEOUT,
120+
},
121+
});
122+
123+
// Actor that throws an error in the run handler to test crash behavior
124+
export const runWithError = actor({
125+
state: {
126+
runStarted: false,
127+
destroyCalled: false,
128+
},
129+
run: async (c) => {
130+
c.state.runStarted = true;
131+
c.log.info("run handler started, will throw error");
132+
await new Promise((resolve) => setTimeout(resolve, 50));
133+
throw new Error("intentional error in run handler");
134+
},
135+
onDestroy: (c) => {
136+
c.state.destroyCalled = true;
137+
},
138+
actions: {
139+
getState: (c) => ({
140+
runStarted: c.state.runStarted,
141+
destroyCalled: c.state.destroyCalled,
142+
}),
143+
},
144+
options: {
145+
sleepTimeout: RUN_SLEEP_TIMEOUT,
146+
},
147+
});
148+
149+
// Actor without a run handler for comparison
150+
export const runWithoutHandler = actor({
151+
state: {
152+
wakeCount: 0,
153+
},
154+
onWake: (c) => {
155+
c.state.wakeCount += 1;
156+
},
157+
actions: {
158+
getState: (c) => ({
159+
wakeCount: c.state.wakeCount,
160+
}),
161+
},
162+
options: {
163+
sleepTimeout: RUN_SLEEP_TIMEOUT,
164+
},
165+
});
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import { Loop } from "@rivetkit/workflow-engine";
2+
import { actor } from "@/actor/mod";
3+
import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants";
4+
import { workflow, workflowQueueName } from "@/workflow/mod";
5+
6+
const WORKFLOW_QUEUE_NAME = "workflow-default";
7+
8+
export const workflowCounterActor = actor({
9+
state: {
10+
runCount: 0,
11+
guardTriggered: false,
12+
history: [] as number[],
13+
},
14+
run: workflow(async (ctx) => {
15+
await ctx.loop({
16+
name: "counter",
17+
run: async (loopCtx) => {
18+
const actorLoopCtx = loopCtx as any;
19+
try {
20+
// Accessing state outside a step should throw.
21+
// biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor.
22+
actorLoopCtx.state;
23+
} catch {}
24+
25+
await loopCtx.step("increment", async () => {
26+
actorLoopCtx.state.runCount += 1;
27+
actorLoopCtx.state.history.push(actorLoopCtx.state.runCount);
28+
});
29+
30+
await loopCtx.sleep("idle", 25);
31+
return Loop.continue(undefined);
32+
},
33+
});
34+
}),
35+
actions: {
36+
getState: async (c) => {
37+
const guardFlag = await c.kv.get(WORKFLOW_GUARD_KV_KEY);
38+
if (guardFlag === "true") {
39+
c.state.guardTriggered = true;
40+
}
41+
return c.state;
42+
},
43+
},
44+
options: {
45+
sleepTimeout: 50,
46+
},
47+
});
48+
49+
export const workflowQueueActor = actor({
50+
state: {
51+
received: [] as unknown[],
52+
},
53+
run: workflow(async (ctx) => {
54+
await ctx.loop({
55+
name: "queue",
56+
run: async (loopCtx) => {
57+
const actorLoopCtx = loopCtx as any;
58+
const payload = await loopCtx.listen(
59+
"queue-wait",
60+
WORKFLOW_QUEUE_NAME,
61+
);
62+
await loopCtx.step("store-message", async () => {
63+
actorLoopCtx.state.received.push(payload);
64+
});
65+
return Loop.continue(undefined);
66+
},
67+
});
68+
}),
69+
actions: {
70+
getMessages: (c) => c.state.received,
71+
},
72+
});
73+
74+
export const workflowSleepActor = actor({
75+
state: {
76+
ticks: 0,
77+
},
78+
run: workflow(async (ctx) => {
79+
await ctx.loop({
80+
name: "sleep",
81+
run: async (loopCtx) => {
82+
const actorLoopCtx = loopCtx as any;
83+
await loopCtx.step("tick", async () => {
84+
actorLoopCtx.state.ticks += 1;
85+
});
86+
await loopCtx.sleep("delay", 40);
87+
return Loop.continue(undefined);
88+
},
89+
});
90+
}),
91+
actions: {
92+
getState: (c) => c.state,
93+
},
94+
options: {
95+
sleepTimeout: 50,
96+
},
97+
});
98+
99+
export { WORKFLOW_QUEUE_NAME, workflowQueueName };

rivetkit-typescript/packages/rivetkit/package.json

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@
3333
"default": "./dist/tsup/mod.cjs"
3434
}
3535
},
36+
"./workflow": {
37+
"import": {
38+
"types": "./dist/tsup/workflow/mod.d.ts",
39+
"default": "./dist/tsup/workflow/mod.js"
40+
},
41+
"require": {
42+
"types": "./dist/tsup/workflow/mod.d.cts",
43+
"default": "./dist/tsup/workflow/mod.cjs"
44+
}
45+
},
3646
"./client": {
3747
"import": {
3848
"types": "./dist/tsup/client/mod.d.ts",
@@ -162,7 +172,7 @@
162172
"./dist/tsup/chunk-*.cjs"
163173
],
164174
"scripts": {
165-
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/serve-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts",
175+
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/serve-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts src/workflow/mod.ts",
166176
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts",
167177
"check-types": "tsc --noEmit",
168178
"lint": "biome check .",
@@ -179,6 +189,7 @@
179189
"dependencies": {
180190
"@hono/standard-validator": "^0.1.3",
181191
"@hono/zod-openapi": "^1.1.5",
192+
"@rivetkit/workflow-engine": "workspace:*",
182193
"@rivetkit/bare-ts": "^0.6.2",
183194
"@rivetkit/engine-runner": "workspace:*",
184195
"@rivetkit/fast-json-patch": "^3.1.2",

0 commit comments

Comments
 (0)