Skip to content

feat(rivetkit): integrate workflows in to actors#4006

Closed
NathanFlurry wants to merge 1 commit into01-02-feat_rivetkit_workflowsfrom
01-21-feat_rivetkit_integrate_workflows_in_to_actors
Closed

feat(rivetkit): integrate workflows in to actors#4006
NathanFlurry wants to merge 1 commit into01-02-feat_rivetkit_workflowsfrom
01-21-feat_rivetkit_integrate_workflows_in_to_actors

Conversation

@NathanFlurry
Copy link
Member

No description provided.

Copy link
Member Author

NathanFlurry commented Jan 22, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Jan 22, 2026

PR Review: feat(rivetkit): integrate workflows into actors

Summary

This PR integrates the workflow engine into RivetKit actors, allowing developers to use actor({ run: workflow(async ctx => { ... }) }) syntax. The implementation is well-architected with proper separation of concerns, comprehensive testing, and good adherence to the project's conventions.

Positive Highlights

Architecture & Design

  • Clean abstraction layers: ActorWorkflowDriver and ActorWorkflowContext provide excellent separation between workflow engine and actor runtime
  • Deterministic execution: The guard system preventing state access outside steps is clever and prevents non-deterministic bugs
  • Live mode integration: Using mode: "live" keeps workflows responsive to messages and alarms without constant polling
  • Comprehensive test coverage: Test fixtures cover basic operations, queue integration, and state guards

Code Quality

  • Type safety: Proper generic constraints maintain type safety across the stack
  • Error handling: Graceful error handling with proper cleanup (e.g., consumeGuardViolation())
  • Resource management: Good use of keepAwake() to prevent premature actor sleep during active operations

Issues & Concerns

Critical Issues

1. Inconsistent indentation in workflow/context.ts:59-64

// Current (mixed tabs/spaces):
	const stepConfig = nameOrConfig as StepConfig<T>;
	const config: StepConfig<T> = {
		...stepConfig,
		run: () => this.#withActorAccess(stepConfig.run),
	};

Issue: Line 59 uses spaces while the rest uses tabs. According to CLAUDE.md, hard tabs are required.

2. Potential race condition in QueueManager message deletion

queue-manager.ts:296-305 - Between loading messages and removal, new messages could arrive, making the index-based removal in #removeMessages potentially unsafe if other operations interleave.
Recommendation: Add a comment explaining the thread safety assumptions.

3. Empty catch block with unclear intent

workflow/context.ts:276-278 - While the comment explains intent, silently catching all errors could hide bugs (e.g., network errors during KV operations).
Recommendation: At minimum, log the error at debug level.

Major Issues

4. Memory leak potential in QueueManager listeners

queue-manager.ts:256-283 - The waitForNames implementation adds listeners but only cleans them up on resolution/rejection. If the promise never settles due to a bug, listeners accumulate.

5. Missing KV key validation

driver.ts:114-126 - The workflow driver directly constructs keys with makeWorkflowKey(key) but doesn't validate that keys stay within reasonable bounds or don't contain invalid characters.

6. Unbounded loop in sleep implementation

workflow-engine/index.ts:434-469 - If Date.now() never advances (clock issues, debugger), this could loop infinitely.
Recommendation: Add iteration limit or additional exit condition.

Minor Issues

  1. Magic number for worker poll interval (driver.ts:98) - Extract 100 to a named constant with documentation
  2. TODO.md in workflow-engine package - Should be tracked in issue tracker instead
  3. Linear search in message consumption (workflow-engine/storage.ts:379-384) - O(n) for each message, consider indexing by name for large queues

Performance Considerations

  1. Frequent KV operations - Workflow engine persists state after every operation. For high-throughput workflows, consider batching flushes.
  2. Queue message loading (queue-manager.ts:326-358) - Loads and decodes all messages every time. Consider caching for large queues.
  3. Message consumption - Linear search could be optimized with indexing for actors with many queued messages.

Security Concerns

  1. No size limits on workflow history - Could cause memory/storage exhaustion. Add configurable limits or pruning.
  2. CBOR deserialization - There is message size validation on enqueue (good), but additional runtime limits on decode could help.

Test Coverage Assessment

Strengths: Basic operations, queue integration, state guards, sleep/alarm behavior all tested

Gaps:

  • No tests for workflow rollback/error recovery
  • No tests for concurrent queue operations
  • No tests for very large queue sizes
  • Limited edge case coverage (clock skew, very long sleeps)

Recommendations by Priority

Must Fix Before Merge

  1. ✅ Fix indentation inconsistency (tabs vs spaces)
  2. ⚠️ Add error logging to empty catch blocks
  3. ⚠️ Document thread-safety assumptions in QueueManager

Should Fix Before Merge

  1. Add JSDoc to public APIs (workflow(), ActorWorkflowContext)
  2. Review and improve test coverage for rollback scenarios
  3. Add validation for workflow KV keys

Consider for Follow-up

  1. Performance optimization for large queues
  2. Workflow history pruning/limits
  3. Enhanced observability (metrics, traces)

Overall Assessment

Quality: High ⭐⭐⭐⭐☆

This is a well-designed and well-implemented feature that follows most of the project's conventions. The architecture is clean, the code is generally well-written, and the test coverage is reasonable. The critical issues are minor and easily fixable. The main areas for improvement are around error handling, performance optimization for scale, and additional test coverage for edge cases.

Recommendation: Approve with minor changes requested.


Review generated by Claude Code. Please verify all suggestions in context.

@NathanFlurry NathanFlurry force-pushed the 01-02-feat_rivetkit_workflows branch from 246d677 to a3fbdd7 Compare January 22, 2026 17:45
@NathanFlurry NathanFlurry force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from 2428e7f to d2ae4eb Compare January 22, 2026 17:45
@claude
Copy link

claude bot commented Jan 22, 2026

Pull Request Review: feat(rivetkit): integrate workflows in to actors

Overview

This PR integrates the workflow engine into RivetKit actors, allowing actors to use workflow() as their run handler. The implementation closely follows the spec in specs/workflow.md and adds comprehensive test coverage.

Code Quality and Best Practices

✅ Strengths

  1. Well-structured architecture: The implementation cleanly separates concerns with dedicated modules:

    • workflow/mod.ts - Main entry point
    • workflow/context.ts - Context wrapper with deterministic guards
    • workflow/driver.ts - Driver bridging actor and workflow systems
    • actor/instance/queue-manager.ts - Queue management
  2. Type safety: Strong TypeScript typing throughout, properly propagating generics across the actor system.

  3. Comprehensive testing: Good test coverage with dedicated test suites for:

    • Basic workflow functionality
    • Queue message consumption
    • Sleep/resume behavior
    • Run handler lifecycle
  4. Deterministic guards: The #ensureActorAccess() mechanism in ActorWorkflowContext properly prevents non-deterministic state access outside workflow steps.

⚠️ Areas for Improvement

1. Error Handling in workflow/mod.ts (Lines 67-78)

The error handling could be more robust:

runCtx.waitUntil(
    handle.result
        .then(() => {
            // Ignore normal completion; the actor will be restarted if needed.
        })
        .catch((error) => {
            runCtx.log.error({
                msg: "workflow run failed",
                error: stringifyError(error),
            });
        }),
);

Issue: When a workflow fails, only logging occurs. The actor may remain in an inconsistent state.

Recommendation: Consider whether the actor should be explicitly destroyed/restarted on workflow failure. Review if this aligns with the spec's statement: "when it finishes the actor's run promise will resolve and crash the actor as today."

2. Inconsistent indentation in workflow/mod.ts (Lines 67-82)

	runCtx.waitUntil(
		handle.result
			.then(() => {
				// Ignore normal completion; the actor will be restarted if needed.
			})
			.catch((error) => {
				runCtx.log.error({
					msg: "workflow run failed",
					error: stringifyError(error),
				});
			}),
	);

		return await new Promise<never>(() => {
			// Intentionally never resolve to keep the run handler alive.
		});
	};
}

Issue: Extra tab before the return statement on line 80.

Fix: Remove the extra indentation.

3. Queue name handling in driver.ts

The stripWorkflowQueueName function (lines 18-23) returns null for non-workflow messages, but the caller doesn't explicitly check:

const workflowName = stripWorkflowQueueName(queueMessage.name);
if (!workflowName) continue;

Observation: While the code correctly handles this with the continue, a more explicit type guard or non-null assertion could improve clarity.

4. BigInt parsing in driver.ts (Lines 75-81)

const ids = messageIds.map((id) => {
    try {
        return BigInt(id);
    } catch {
        return null;
    }
});

Issue: Silent error handling without logging could make debugging difficult.

Recommendation: Consider logging when message ID parsing fails, especially since invalid IDs suggest a serious bug.

Potential Bugs

🔴 Critical

1. Race condition in queue message deletion (queue-manager.ts:291-305)

async deleteMessagesById(ids: bigint[]): Promise<bigint[]> {
    if (ids.length === 0) {
        return [];
    }
    const idSet = new Set(ids.map((id) => id.toString()));
    const entries = await this.#loadQueueMessages();
    const toRemove = entries.filter((entry) =>
        idSet.has(entry.id.toString()),
    );
    if (toRemove.length === 0) {
        return [];
    }
    await this.#removeMessages(toRemove);
    return toRemove.map((entry) => entry.id);
}

Issue: Between loading messages and removing them, new messages could be enqueued, potentially causing metadata inconsistencies.

Impact: Medium - Could lead to incorrect queue size tracking.

Recommendation: Consider adding transactional semantics or at least document this race condition.

🟡 Medium

2. Metadata rebuild doesn't update nextId correctly (queue-manager.ts:434-445)

let maxId = 0n;
for (const [key] of entries) {
    try {
        const messageId = decodeQueueMessageKey(key);
        if (messageId > maxId) {
            maxId = messageId;
        }
    } catch {
        // Skip malformed keys
    }
}
this.#metadata.nextId = maxId + 1n;

Issue: If there are gaps in message IDs (due to deletions), the nextId will be correct. However, if entries is empty, maxId remains 0n, setting nextId to 1n, which might not match the default metadata's nextId.

Impact: Low - Only affects recovery from corrupted state.

Recommendation: Consider explicitly checking if entries is empty and resetting to default metadata.

3. State mutation attempt during guard violation (workflow/context.ts:266-279)

#markGuardTriggered(): void {
    try {
        const state = this.#runCtx.state as Record<string, unknown>;
        if (
            state &&
            typeof state === "object" &&
            "guardTriggered" in state
        ) {
            (state as Record<string, unknown>).guardTriggered = true;
        }
    } catch {
        // Ignore if state is unavailable
    }
    // ... KV write
}

Issue: This method attempts to mutate state outside a workflow step, which seems contradictory to the guard's purpose. The KV write is appropriate, but state mutation should probably not happen here.

Impact: Medium - Could lead to inconsistent state behavior.

Recommendation: Remove the state mutation part, keep only the KV write for persistence.

Performance Considerations

✅ Good Practices

  1. Batch operations: The implementation uses kvBatchPut, kvBatchGet, and kvBatchDelete for efficient KV operations.

  2. Metadata caching: Queue metadata is kept in memory (#metadata) to avoid repeated reads.

  3. Message listener optimization: Uses Set for efficient name lookups in message listeners.

⚠️ Potential Optimizations

1. Repeated queue scans (queue-manager.ts)

The #loadQueueMessages() method scans all queue messages multiple times:

  • In receive() for immediate drain
  • In #maybeResolveWaiters() for each waiter
  • In deleteMessagesById() to find messages

Impact: Could be inefficient with large queues.

Recommendation: Consider caching loaded messages within a single operation cycle, or implementing an in-memory index.

2. WorkflowQueueName prefix check (driver.ts:49)

Every message load loops through messages and checks the prefix:

for (const queueMessage of queueMessages) {
    const workflowName = stripWorkflowQueueName(queueMessage.name);
    if (!workflowName) continue;
    // ...
}

Impact: Minor overhead on every message load.

Suggestion: Consider using a prefix-based query if the underlying KV system supports it.

Security Concerns

✅ No Critical Issues

  1. Input validation: Queue message size limits are enforced (maxQueueMessageSize).
  2. CBOR serialization safety: Uses isCborSerializable to validate before encoding.
  3. Abort handling: Proper cleanup of listeners and timeouts on abort.

💡 Suggestions

1. Message ID validation

In driver.ts:75-81, invalid BigInt conversions are silently ignored. Consider adding rate limiting if many invalid IDs are detected, as this could indicate an attack or bug.

2. Queue size limits

The queue enforces maxQueueSize and maxQueueMessageSize, which is good. Ensure these limits are documented and reasonable defaults are set.

Test Coverage

✅ Comprehensive Testing

  1. Run handler tests (actor-run.ts):

    • Handler startup and ticking
    • Graceful exit on abort
    • Queue consumption
    • Error handling
  2. Workflow tests (actor-workflow.ts):

    • Step replay and guard enforcement
    • Queue message consumption via workflow listen
    • Sleep and resume behavior
  3. Fixture actors provide good examples:

    • Counter actor with loop and state access guards
    • Queue consumer actor
    • Sleep actor

📝 Recommendations

  1. Add edge case tests:

    • Workflow with very long history (test replay performance)
    • Concurrent message arrival during workflow execution
    • Actor restart mid-workflow-step
  2. Add error scenario tests:

    • KV operation failures
    • Corrupted workflow state
    • Message deserialization errors
  3. Add integration tests:

    • Multiple actors with workflows interacting
    • High-frequency message delivery
    • Workflow migration/upgrade scenarios

Documentation

✅ Good Documentation

  1. Specification document (specs/workflow.md) is comprehensive and well-structured.
  2. Lifecycle documentation (website/src/content/docs/actors/lifecycle.mdx) was updated with 164 new lines.
  3. TODO document (workflow-engine/TODO.md) tracks remaining work.

📝 Suggestions

  1. Add JSDoc comments to public APIs in workflow/mod.ts, especially the workflow() function.
  2. Document the workflow prefix convention for queue names.
  3. Add examples showing common workflow patterns (retry, timeout, fan-out/fan-in).

Additional Observations

Code Style Compliance

  1. Import organization: ✅ Imports are properly organized at the top of files.
  2. Naming conventions: ✅ Follows kebab-case for file names, camelCase for variables.
  3. Structured logging: ✅ Uses structured logging with msg and contextual fields.
  4. Comment style: ⚠️ Some comments could be more concise (e.g., avoid fragments).

Architecture Alignment

The implementation aligns well with the specification:

  • ✅ Single workflow per actor
  • ✅ Deterministic replay with guards
  • ✅ Queue-based message delivery
  • ✅ Actor alarm integration
  • ✅ Logging integration
  • ✅ "Live" mode execution

Summary

This is a solid implementation that successfully integrates workflows into RivetKit actors. The code is well-structured, type-safe, and includes good test coverage.

Priority Fixes

  1. HIGH: Fix indentation issue in workflow/mod.ts:80
  2. MEDIUM: Clarify workflow failure behavior - should actor be destroyed?
  3. MEDIUM: Remove state mutation in #markGuardTriggered()
  4. LOW: Add logging for BigInt parsing failures
  5. LOW: Document race condition in deleteMessagesById()

Recommendations

  1. Add more edge case and error scenario tests
  2. Add JSDoc comments to public APIs
  3. Consider caching queue messages to reduce repeated scans
  4. Review and test workflow error handling behavior

Overall: Approve with minor changes requested

The implementation is production-ready with the suggested fixes applied. The architecture is sound and the code quality is high.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Jan 22, 2026

More templates

@rivetkit/virtual-websocket

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/virtual-websocket@4006

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@4006

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@4006

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@4006

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@4006

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@4006

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@4006

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@4006

@rivetkit/workflow-engine

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/workflow-engine@4006

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@4006

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@4006

commit: ba95c7a

@NathanFlurry NathanFlurry force-pushed the 01-02-feat_rivetkit_workflows branch from a3fbdd7 to b4b061f Compare January 24, 2026 23:35
@NathanFlurry NathanFlurry force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from d2ae4eb to 2bdce1c Compare January 24, 2026 23:35
@NathanFlurry NathanFlurry force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from 2bdce1c to 9524546 Compare January 28, 2026 01:12
@NathanFlurry NathanFlurry force-pushed the 01-02-feat_rivetkit_workflows branch from b4b061f to 8a67460 Compare January 28, 2026 01:12
@jog1t jog1t force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from 9524546 to e9cb986 Compare January 28, 2026 19:55
@jog1t jog1t force-pushed the 01-02-feat_rivetkit_workflows branch from 8a67460 to a5391dd Compare January 28, 2026 19:55
@NathanFlurry NathanFlurry force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from e9cb986 to 7ff09e8 Compare January 30, 2026 08:19
@NathanFlurry NathanFlurry force-pushed the 01-02-feat_rivetkit_workflows branch from a5391dd to 18bb55f Compare January 30, 2026 08:19
@jog1t jog1t force-pushed the 01-02-feat_rivetkit_workflows branch from 18bb55f to 177df46 Compare January 30, 2026 21:29
@jog1t jog1t force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from 7ff09e8 to 036582c Compare January 30, 2026 21:29
@NathanFlurry NathanFlurry changed the base branch from 01-02-feat_rivetkit_workflows to graphite-base/4006 February 3, 2026 19:40
@NathanFlurry NathanFlurry mentioned this pull request Feb 3, 2026
11 tasks
@NathanFlurry NathanFlurry force-pushed the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch from 036582c to ba95c7a Compare February 3, 2026 20:02
@NathanFlurry NathanFlurry changed the base branch from graphite-base/4006 to 01-02-feat_rivetkit_workflows February 3, 2026 20:02
@graphite-app
Copy link
Contributor

graphite-app bot commented Feb 4, 2026

Merge activity

  • Feb 4, 8:36 PM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Feb 4, 8:37 PM UTC: CI is running for this pull request on a draft pull request (#4114) due to your merge queue CI optimization settings.
  • Feb 4, 8:38 PM UTC: Merged by the Graphite merge queue via draft PR: #4114.

@graphite-app graphite-app bot closed this Feb 4, 2026
@graphite-app graphite-app bot deleted the 01-21-feat_rivetkit_integrate_workflows_in_to_actors branch February 4, 2026 20:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant