Skip to content

executor engine rebased#1210

Open
debajyoti0606 wants to merge 2 commits intomainfrom
executor-engine-v3
Open

executor engine rebased#1210
debajyoti0606 wants to merge 2 commits intomainfrom
executor-engine-v3

Conversation

@debajyoti0606
Copy link
Contributor

@debajyoti0606 debajyoti0606 commented Nov 11, 2025

Description

Testing

Additional Notes

Summary by CodeRabbit

  • New Features

    • Added workflow template creation, management, and execution APIs with step-level editing and deletion.
    • Introduced execution engine for workflow processing with message queue-based orchestration.
    • Implemented 12+ workflow tools including email, Gmail, forms, Jira, HTTP requests, schedulers, manual triggers, and AI agent integrations.
    • Added workflow execution details retrieval and engine health check endpoints.
  • Chores

    • Updated queue configurations from expireInHours to expireInSeconds and monitorStateIntervalMinutes to monitorIntervalSeconds.
    • Updated pg-boss dependency from v10 to v11.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 11, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

This PR introduces a complete workflow execution engine with message-queue-based orchestration, comprehensive tool implementations, and server APIs for template management, step lifecycle, and execution control. It also updates job queue expiration configurations from hours to seconds for consistency.

Changes

Cohort / File(s) Summary
Queue Configuration Updates
server/queue/boss.ts, server/queue/api-server-queue.ts, server/queue/index.ts
Replaces expireInHours with expireInSeconds and updates PgBoss configuration option monitorStateIntervalMinutes to monitorIntervalSeconds for consistency with PgBoss 11.x.
Job Scheduling Updates
server/api/knowledgeBase.ts, server/api/oauth.ts
Converts job expiration from expireInHours: 12 to expireInSeconds: 43200 and expireInHours * 3600 equivalents across file processing and OAuth queue operations.
Execution Engine Core
server/execution-engine/server.ts, server/execution-engine/message-queue.ts, server/execution-engine/execution-client.ts
Establishes a new standalone Hono-based execution engine server with PgBoss-backed message queue, client for publishing execution requests, and health check endpoints.
Execution Engine Orchestration
server/execution-engine/communication-service.ts, server/execution-engine/execution-engine-queue.ts, server/execution-engine/workflow-executor.ts, server/execution-engine/step-executor.ts
Implements message routing (START_EXECUTION, STOP_EXECUTION, MANUAL_TRIGGER), execution queue management with job scheduling, workflow template execution coordination, and step execution with multi-input aggregation and transactional updates.
Execution Engine Triggers & Types
server/execution-engine/triggers.ts, server/execution-engine/types.ts
Adds trigger handlers for manual step advancement and template state transitions; defines core execution interfaces (ExecutionContext, ExecutionPacket, StepExecutionResult, WorkflowExecutionState).
Workflow Template APIs
server/api/workflow-template.ts
Provides CRUD endpoints and state management for workflow templates, tool type discovery, schema validation, and structured response formatting with nested steps and connections.
Workflow Template Step APIs
server/api/workflow-template-step.ts
Implements step lifecycle management (add, update, delete) with transactional integrity, connection/link management, route validation, and per-step tool association.
Workflow Execution APIs
server/api/workflow-execution.ts
Exposes execution control endpoints (start template, stop execution, health check, manual trigger) and detailed execution status retrieval with step/tool execution metrics and duration calculations.
Workflow Tool Implementations
server/workflow-tools/{agent, ai-agent, delay, email, form, gmail, http-request, jira, manual-trigger, merged-node, scheduler-trigger, slack, switch, webhook}.ts
Introduces 14 concrete tool implementations covering triggers (MANUAL_TRIGGER, SCHEDULER_TRIGGER), actions (EMAIL, GMAIL, SLACK, HTTP_REQUEST, JIRA, AGENT, AI_AGENT, FORM), and system tools (SWITCH, DELAY, MERGED_NODE, WEBHOOK) with input/output/config schemas and execute methods.
Workflow Tool Registry & Types
server/workflow-tools/registry.ts, server/workflow-tools/types.ts
Centralizes tool registration, discovery, and metadata with helpers (getTool, isToolSupported, getToolsByCategory); defines WorkflowTool interface, WorkflowContext, and ToolExecutionResult types.
Database Schema Extensions
server/db/schema/workflows.ts, server/db/workflow.ts
Adds enums (ToolCategory, TemplateState) and new fields (state, deprecated) to templates/steps; extends WorkflowStatus (WAITING), ToolType (SWITCH, MANUAL_TRIGGER, SCHEDULER_TRIGGER), and ToolExecutionStatus (AWAITING_USER_INPUT); adds helper for template step access checks.
Configuration & Types
server/config.ts, server/types.ts, server/types/workflowTypes.ts
Exposes executionEnginePort/Host from environment; adds ExecutionEngine subsystem enum; introduces ToolCategory and TemplateState enums plus WorkflowStatus/ToolType/ToolExecutionStatus extensions.
Server Integration
server/server.ts, server/package.json
Wires new workflow APIs into main router; initializes message queue on startup; adds dev scripts for execution engine server; upgrades pg-boss dependency to ^11.1.1.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant MainServer as Main Server
    participant MessageQueue as Message Queue
    participant ExecEngine as Execution Engine
    participant Database as Database

    Client->>MainServer: POST /workflow/template/execute
    MainServer->>MainServer: Validate & authenticate
    MainServer->>MessageQueue: publishExecution (START_EXECUTION)
    MainServer-->>Client: Return executionId
    
    ExecEngine->>MessageQueue: Poll for messages
    MessageQueue-->>ExecEngine: START_EXECUTION
    ExecEngine->>ExecEngine: handleStartExecution
    ExecEngine->>Database: Create workflow_execution
    ExecEngine->>Database: Create step_executions
    ExecEngine->>ExecEngine: queueNextSteps (root steps)
    ExecEngine->>MessageQueue: publishResponse
    
    ExecEngine->>ExecEngine: Process root step packet
    ExecEngine->>ExecEngine: StepExecutor.executeStep
    ExecEngine->>ExecEngine: Execute tool (EmailTool, FormTool, etc.)
    ExecEngine->>Database: Create tool_execution record
    ExecEngine->>ExecEngine: Determine nextAction
    
    alt nextAction = continue
        ExecEngine->>ExecEngine: queueNextSteps (next steps)
    else nextAction = wait_for_input
        ExecEngine->>Database: Mark step WAITING
    end

    Client->>MainServer: POST /workflow/:id/manual-trigger/:stepId
    MainServer->>ExecEngine: triggerManualStep
    ExecEngine->>ExecEngine: queueNextSteps (pending step)
    ExecEngine->>Database: Update execution status
    ExecEngine-->>MainServer: Success
    MainServer-->>Client: Trigger acknowledged
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120+ minutes

Areas requiring extra attention:

  • server/execution-engine/ subsystem — New distributed execution engine with complex queue management, message routing, and transactional workflow orchestration; multi-input aggregation logic in step-executor.ts is particularly dense.
  • server/workflow-tools/ implementations — 14 new tool classes with varying levels of scaffolding (some return mocked results); verify consistency of interface implementations, error handling patterns, and trigger handler logic (especially scheduler-trigger.ts).
  • server/api/workflow-template-step.ts — Sophisticated step lifecycle with connection route management, transactional updates, and graph metadata manipulation; verify next/prev step link integrity across delete/update operations.
  • Database schema migrations — New enums, fields, and optional foreign keys (workflowToolId no longer required in tool_execution); ensure backward compatibility and migration path.
  • Job queue expiration refactoring — Changes across 5+ files from expireInHours to expireInSeconds; verify all invocations use correct unit conversion (e.g., * 3600).
  • Integration points — Message queue initialization, execution engine startup, and router wiring in server/server.ts; ensure lifecycle management (startup, shutdown, error recovery).

Possibly related PRs

Suggested reviewers

  • zereraz
  • shivamashtikar
  • kalpadhwaryu
  • junaid-shirur
  • devesh-juspay

Poem

🐰 A workflow engine hops to life,
With queues and steps, no more the strife,
From trigger springs to mail that flies,
Each tool a gift of grand reprise!
Execution flows through rabbit's delight,
Orchestrated perfectly, day and night.

Pre-merge checks and finishing touches

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'executor engine rebased' is vague and does not clearly describe the substantive changes in the pull request, which involve extensive workflow execution infrastructure including new APIs, database schema updates, and tool implementations. Consider a more descriptive title such as 'Add workflow execution engine with template management, step orchestration, and tool registry' to clearly communicate the scope and nature of the changes.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch executor-engine-v3

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @debajyoti0606, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust and scalable workflow execution engine, fundamentally changing how automated processes are handled. It establishes a new communication layer using PgBoss for asynchronous job processing, defines a flexible structure for workflow templates and steps, and provides a rich set of API endpoints for full lifecycle management of workflows. The changes lay the groundwork for highly customizable and extensible automation capabilities within the application.

Highlights

  • New Workflow Execution Engine: Introduced a comprehensive workflow execution engine, enabling the definition, execution, and management of complex automated processes within the application. This includes a new architecture for inter-service communication and a redesigned database schema.
  • PgBoss Version Upgrade and Configuration: Upgraded the PgBoss library to version 11.1.1 and updated its configuration across various queue-related files, transitioning from monitorStateIntervalMinutes to monitorIntervalSeconds and expireInHours to expireInSeconds for job scheduling.
  • Extensive Workflow API Endpoints: Added a full suite of new API routes for managing workflows, including endpoints for creating, updating, deleting, retrieving, and validating workflow templates and steps. New APIs also support executing workflows, checking their status and details, stopping them, and handling manual triggers.
  • Modular Workflow Tool Definitions: Defined a modular system for various workflow tools (e.g., Email, AI Agent, Form, Delay, Slack, Gmail, Agent, Merged Node, Switch, Manual Trigger, Scheduler Trigger, Webhook, HTTP Request, Jira), each with its own configuration, input/output schemas, and execution logic, enhancing extensibility.
  • Database Schema Enhancements: Updated the workflows.ts database schema to support the new execution engine, incorporating new enums for ToolCategory and TemplateState, and adding fields like state and deprecated for templates/steps, and category, inputCount, and input for tools and tool executions.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new executor engine, which is a significant addition to the codebase. The changes are extensive, including new API endpoints, database schema modifications, and a new message queue-based architecture for background processing. The overall structure is well-designed, but there are several areas for improvement regarding error handling, efficiency, and code correctness. I've identified a critical bug in the template validation logic and other high-severity issues related to input validation and fragile implementation details. Additionally, there are opportunities to improve code readability and maintainability.

Comment on lines 811 to 814
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There appears to be a bug in this condition. target is a UUID, but it's being compared to node.name, which is a string. This comparison will always be false. It should likely be target === node.uuid. Similarly, workflowNodeNames is a Set of string names, but source is a UUID, so workflowNodeNames.has(source) will also always be false. You should create a set of UUIDs for this check.

        const workflowNodeUUIDs = new Set(workflow.map(n => n.uuid))
        return !connections.some((conn: string[]) => {
          const [[source,], [target,]] = extractUUIDFromArray(conn)
          return target === node.uuid && workflowNodeUUIDs.has(source)
        })

Comment on lines +27 to +32
const requestData = await c.req.json()
const { templateId } = requestData

if (!templateId) {
throw new HTTPException(400, { message: "templateId is required in request body" })
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The request body is parsed from JSON without any validation. This can lead to runtime errors if the request body is malformed or missing required fields. It's a good practice to use a validation library like zod to define a schema for the request body and validate it. This pattern is already used in other new files like server/api/workflow-template-step.ts. This would improve type safety and provide clear error messages for invalid requests. This feedback also applies to other handlers in this file that process request bodies or parameters without validation.

Comment on lines +608 to +618
const connections: string[] = []
steps.forEach(step => {
if (step.nextStepIds && step.nextStepIds.length > 0) {
step.nextStepIds.forEach(nextStepId => {
const nextStep = steps.find(s => s.id === nextStepId)
if (nextStep) {
connections.push(`${step.name}-${nextStep.name}`)
}
})
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reconstructing connections using step names (e.g., ${step.name}-${nextStep.name}) is fragile because step names are not guaranteed to be unique within a workflow template. If two steps have the same name, this logic will lead to incorrect connections being created. It would be more robust to use the unique step IDs for this purpose.

{
retryLimit: 3,
expireInHours: 12,
expireInSeconds: 43200,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, it's recommended to avoid using magic numbers. The value 43200 can be expressed as a calculation, which makes the intent clearer. This is consistent with the approach taken in server/api/oauth.ts (JobExpiryHours * 3600). This comment also applies to other occurrences of 43200 in this file.

        expireInSeconds: 12 * 60 * 60,

)

if (!template) {
throw new Error("Template not found or access denied")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Throwing a generic Error within a database transaction will result in a generic 500 Internal Server Error response to the client. It's better to use HTTPException with a specific status code (e.g., 404 for 'not found', 400 for 'bad request') to provide more meaningful feedback to the API consumer. This allows the client to handle different error scenarios appropriately. This applies to other throw new Error(...) statements within this file's API handlers.

Suggested change
throw new Error("Template not found or access denied")
throw new HTTPException(404, { message: "Template not found or access denied" })

Comment on lines 437 to 440
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This query fetches all tools for the current workspace and then filters them in memory. This can be inefficient if the workspace contains a large number of tools. Since you have the IDs of the newly created tools in toolIdMap, you can fetch only the relevant tools directly from the database using inArray.

Suggested change
const createdTools = await db
.select()
.from(workflowTool)
.where(eq(workflowTool.workspaceId, user.workspaceId))
const createdTools = toolIdMap.size > 0 ? await db
.select()
.from(workflowTool)
.where(inArray(workflowTool.id, Array.from(toolIdMap.values())))
: []

Comment on lines +159 to +164
const mockContext = {
req: {
param: () => ({ workflowId: request.workflowId, stepId: request.stepId })
},
json: (data: any) => data
} as any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating a mock Context object to call handleManualTrigger couples this service to the implementation details of an API handler. This is considered a code smell. A better approach would be to extract the core logic from handleManualTrigger into a separate function that can be called directly by both this service and the Hono API handler. This would improve modularity and make the code easier to test and maintain.

Comment on lines +64 to +136
async pollForResponse(correlationId: string, timeoutMs: number = 30000): Promise<ExecutionResponse> {
if (!this.checkInitialized()) {
throw new Error("Message queue not initialized")
}

const startTime = Date.now()
const pollInterval = 500 // Poll every 500ms

return new Promise((resolve, reject) => {
const poll = async () => {
try {
// Get the internal database connection from PgBoss
const db = this.getDatabase()

// Direct SQL query to find job by correlation ID
// Using JSONB operator ->> for efficient correlation ID filtering
const findJobSql = `
SELECT id, data, state, created_on
FROM pgboss.job
WHERE name = $1
AND state = 'created'
AND data->>'correlationId' = $2
ORDER BY created_on ASC
LIMIT 1
`

const result = await db.executeSql(findJobSql, [this.OUTGOING_QUEUE, correlationId])

if (result?.rows?.length > 0) {
const job = result.rows[0]
const jobData = job.data as ExecutionResponse

// Extract response data
const response: ExecutionResponse = {
success: jobData.success,
executionId: jobData.executionId,
error: jobData.error,
data: jobData.data
}

// Mark job as completed using direct SQL
const completeJobSql = `
UPDATE pgboss.job
SET state = 'completed',
completed_on = now()
WHERE id = $1
`

await db.executeSql(completeJobSql, [job.id])

Logger.info(`Response found and processed for correlation ID: ${correlationId}`)
resolve(response)
return
}

// Check timeout
if (Date.now() - startTime >= timeoutMs) {
reject(new Error(`Response timeout for correlation ID: ${correlationId}`))
return
}

// Continue polling
setTimeout(poll, pollInterval)

} catch (error) {
Logger.error(error, `Error polling for response ${correlationId}`)
reject(error)
}
}

poll()
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pollForResponse function relies on direct SQL queries against the internal schema of pg-boss. This is a fragile approach, as the pgboss.job table structure could change in future versions of the library, which would break this code. While this might be a deliberate choice for performance, it's worth noting the risk. A safer, though potentially less performant, approach would be to use the public API of pg-boss if it provides a way to fetch jobs by custom attributes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 27

🧹 Nitpick comments (12)
server/workflow-tools/form.ts (1)

79-92: Preserve field configuration in error case.

When an error occurs, the error response returns an empty fields array, losing the form configuration. This makes it difficult for the client to understand what form was expected.

Apply this diff:

   } catch (error) {
     return {
       status: ToolExecutionStatus.FAILED,
       output: {
         form_id: "",
         form_title: config.title || "Form Input Required",
         form_description: config.description || "Please fill out the form",
-        fields: [],
+        fields: config.fields || [],
         submitted: false,
         submission_data: {},
         error: error instanceof Error ? error.message : String(error),
       },
     }
   }
server/workflow-tools/webhook.ts (1)

46-68: Implement webhook execution logic and add schema validation.

The webhook tool currently returns mock data and lacks input/config validation. The TODO at line 52 indicates this is incomplete.

Two concerns:

  1. Missing implementation: The tool returns mock success data without making actual HTTP requests.
  2. No validation: Like the FormTool, the defined inputSchema and configSchema are not used to validate parameters.

Apply this diff to add validation:

 async execute(
   input: Record<string, any>,
   config: Record<string, any>,
   workflowContext: WorkflowContext
 ): Promise<ToolExecutionResult> {
   try {
+    // Validate input and config
+    const validatedInput = this.inputSchema.parse(input)
+    const validatedConfig = this.configSchema.parse(config)
+
     // TODO: Implement webhook execution logic
     return {

Would you like me to generate a complete implementation of the webhook HTTP request logic, including timeout handling, retry logic, and proper error responses?

server/execution-engine/communication-service.ts (1)

123-136: Reminder: Implement stop execution logic.

The TODO at line 125 indicates that stop execution functionality is not yet implemented. This is a critical feature for workflow management.

Do you want me to generate an implementation for stopping workflow executions, or should this be tracked in a separate issue?

server/workflow-tools/agent.ts (1)

35-47: Stub implementation returns mock data.

The execute method returns a hardcoded response without performing any actual agent interaction. The configured agentId, prompt, and timeout are ignored.

This appears to be a placeholder for future implementation. Would you like me to help design the actual agent integration logic, or should this be tracked in a separate issue?

server/execution-engine/index.ts (2)

67-69: Remove commented-out code.

These commented export statements should either be uncommented (if needed) or removed entirely to keep the codebase clean.

Apply this diff to remove the commented lines:

-// Named and default export
-// export { ExecutionEngine }
-// export default ExecutionEngine

52-58: Consider returning comprehensive engine status.

The getEngineStatus method only returns service status but not queue, executor, or client status. This may provide an incomplete picture of the engine's health.

Consider expanding the status object to include other subsystem states:

static async getEngineStatus() {
  const { communicationService } = await import("./communication-service")
  const { messageQueue } = await import("./message-queue")
  const { executionClient } = await import("./execution-client")

  return {
    service: communicationService.getServiceStatus(),
    queue: messageQueue.getQueueStatus?.() || { status: 'unknown' },
    client: { connected: await executionClient.healthCheck() }
  }
}
server/workflow-tools/http-request.ts (1)

31-37: Consider adding URL validation in the input schema.

While the URL will need runtime SSRF checks, adding basic format validation in the schema provides an early safety layer.

   inputSchema = z.object({
-    url: z.string(),
+    url: z.string().url("Must be a valid URL"),
     method: z.enum(["GET", "POST", "PUT", "DELETE", "PATCH"]).default("GET"),
     headers: z.record(z.string(), z.string()).optional(),
     body: z.any().optional(),
     params: z.record(z.string(), z.string()).optional(),
   })
server/workflow-tools/switch.ts (1)

77-101: Consider using strict equality operators.

Lines 81 and 83 use loose equality (== and !=) which can lead to unexpected type coercion. For example, "5" == 5 evaluates to true, which may not be the intended behavior in workflow conditions.

If loose equality is intentional for flexible comparisons, document this behavior. Otherwise, use strict equality:

         switch (operator) {
           case "==":
-            return val1 == val2
+            return val1 === val2
           case "!=":
-            return val1 != val2
+            return val1 !== val2
server/workflow-tools/jira.ts (1)

61-85: Stub implementation - tracking implementation status.

The TODO on line 67 indicates this is a placeholder. When implementing the actual Jira integration, ensure you handle authentication errors, rate limiting, and API response validation.

Would you like me to help design the Jira API integration logic or create a separate issue to track this implementation?

server/workflow-tools/slack.ts (1)

57-69: Stub implementation - no actual Slack integration.

The execute method returns a static response without making any Slack API calls. The configured channel, username, and emoji are ignored.

Would you like me to help design the Slack API integration using the Slack SDK or webhook approach?

server/workflow-tools/merged-node.ts (1)

41-53: Missing merge logic implementation.

The execute method ignores the configured mergeStrategy, waitForAll, and timeout options. The tool should implement actual merge logic based on these settings (e.g., merging objects, concatenating arrays, or selecting the latest value).

Would you like me to help design the merge logic for different strategies, or should this be tracked as a follow-up task?

server/server.ts (1)

1568-1569: Consider extracting inline arrow functions for better maintainability.

The inline arrow functions wrapping HandleStateChangeTemplateApi are functional but could be more maintainable if extracted as named handlers or created via a factory function.

Example refactor:

// Extract as named handlers
const handleActivateTemplate = (c: Context) => 
  HandleStateChangeTemplateApi(c, TemplateState.ACTIVE)
const handleDeactivateTemplate = (c: Context) => 
  HandleStateChangeTemplateApi(c, TemplateState.INACTIVE)

// Then use in routes:
.post("/workflow/template/activate", handleActivateTemplate)
.post("/workflow/template/deactivate", handleDeactivateTemplate)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c842052 and 3705c9b.

📒 Files selected for processing (40)
  • server/api/knowledgeBase.ts (4 hunks)
  • server/api/oauth.ts (1 hunks)
  • server/api/workflow-execution.ts (1 hunks)
  • server/api/workflow-template-step.ts (1 hunks)
  • server/api/workflow-template.ts (1 hunks)
  • server/config.ts (2 hunks)
  • server/db/schema/workflows.ts (8 hunks)
  • server/execution-engine/communication-service.ts (1 hunks)
  • server/execution-engine/execution-client.ts (1 hunks)
  • server/execution-engine/execution-engine-queue.ts (1 hunks)
  • server/execution-engine/index.ts (1 hunks)
  • server/execution-engine/message-queue.ts (1 hunks)
  • server/execution-engine/server.ts (1 hunks)
  • server/execution-engine/step-executor.ts (1 hunks)
  • server/execution-engine/triggers.ts (1 hunks)
  • server/execution-engine/types.ts (1 hunks)
  • server/execution-engine/workflow-executor.ts (1 hunks)
  • server/package.json (2 hunks)
  • server/queue/api-server-queue.ts (1 hunks)
  • server/queue/boss.ts (1 hunks)
  • server/queue/index.ts (9 hunks)
  • server/server.ts (4 hunks)
  • server/types.ts (1 hunks)
  • server/types/workflowTypes.ts (3 hunks)
  • server/workflow-tools/agent.ts (1 hunks)
  • server/workflow-tools/ai-agent.ts (1 hunks)
  • server/workflow-tools/delay.ts (1 hunks)
  • server/workflow-tools/email.ts (1 hunks)
  • server/workflow-tools/form.ts (1 hunks)
  • server/workflow-tools/gmail.ts (1 hunks)
  • server/workflow-tools/http-request.ts (1 hunks)
  • server/workflow-tools/jira.ts (1 hunks)
  • server/workflow-tools/manual-trigger.ts (1 hunks)
  • server/workflow-tools/merged-node.ts (1 hunks)
  • server/workflow-tools/registry.ts (1 hunks)
  • server/workflow-tools/scheduler-trigger.ts (1 hunks)
  • server/workflow-tools/slack.ts (1 hunks)
  • server/workflow-tools/switch.ts (1 hunks)
  • server/workflow-tools/types.ts (1 hunks)
  • server/workflow-tools/webhook.ts (1 hunks)
🧰 Additional context used
🧠 Learnings (10)
📓 Common learnings
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.
📚 Learning: 2025-09-02T16:41:31.729Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowTools.ts:9-14
Timestamp: 2025-09-02T16:41:31.729Z
Learning: In the workflowTools.ts schema, the user prefers to add notNull() constraint to the type field but does not want onDelete cascade behavior for the workflowTemplateId foreign key reference.

Applied to files:

  • server/workflow-tools/types.ts
  • server/db/schema/workflows.ts
📚 Learning: 2025-09-12T13:28:43.782Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.

Applied to files:

  • server/api/workflow-execution.ts
  • server/server.ts
📚 Learning: 2025-10-27T12:58:52.760Z
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: frontend/src/components/workflow/WorkflowCard.tsx:47-48
Timestamp: 2025-10-27T12:58:52.760Z
Learning: In the workflow sharing feature, the UserWorkflowRole enum defined in server/shared/types.ts has three valid values: Owner = "owner", Shared = "shared", and Viewer = "viewer". Checking workflow.role === "shared" is valid and correct for detecting workflows that have been shared with a user.

Applied to files:

  • server/types/workflowTypes.ts
📚 Learning: 2025-10-28T07:51:03.144Z
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: server/db/schema/workflows.ts:320-320
Timestamp: 2025-10-28T07:51:03.144Z
Learning: In the workflow template architecture, the API layer handles conversion between external UUIDs (used by frontend) and internal integer IDs (used in database). Frontend types expect UUID strings for workflow template IDs, while internal schemas like createWorkflowStepTemplateSchema expect integers. The publicWorkflowTemplateSchema maps external_id → id for API responses.

Applied to files:

  • server/db/schema/workflows.ts
  • server/api/workflow-template-step.ts
  • server/api/workflow-template.ts
  • server/server.ts
📚 Learning: 2025-09-04T09:29:29.879Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowServiceConfig.ts:51-53
Timestamp: 2025-09-04T09:29:29.879Z
Learning: In the workflowServiceConfig.ts schema, the user prefers to keep currentWorkflowExeId as a simple uuid reference without adding foreign key constraints or onDelete behaviors to the workflowExe table.

Applied to files:

  • server/db/schema/workflows.ts
  • server/execution-engine/workflow-executor.ts
📚 Learning: 2025-09-16T08:57:58.762Z
Learnt from: rahul1841
Repo: xynehq/xyne PR: 843
File: server/server.ts:996-996
Timestamp: 2025-09-16T08:57:58.762Z
Learning: The DownloadFileApi handler in server/api/knowledgeBase.ts properly uses the clId route parameter to scope and validate file access within the correct collection.

Applied to files:

  • server/api/knowledgeBase.ts
📚 Learning: 2025-05-28T10:55:46.701Z
Learnt from: naSim087
Repo: xynehq/xyne PR: 484
File: server/integrations/google/gmail-worker.ts:293-294
Timestamp: 2025-05-28T10:55:46.701Z
Learning: There are two separate `parseMail` functions in the codebase: one in `server/integrations/google/gmail-worker.ts` with signature `(email, gmail, client, userEmail)` returning `{ mailData, insertedPdfCount, exist }`, and another in `server/integrations/google/gmail/index.ts` with signature `(email, gmail, userEmail, client, tracker?)` returning `{ mailData, exist }`. Each file calls its own local version correctly.

Applied to files:

  • server/workflow-tools/gmail.ts
📚 Learning: 2025-06-17T08:55:40.153Z
Learnt from: oindrila-b
Repo: xynehq/xyne PR: 557
File: server/integrations/google/gmail-worker.ts:313-317
Timestamp: 2025-06-17T08:55:40.153Z
Learning: In server/integrations/google/gmail-worker.ts, the log message intentionally shows totalMails (cumulative count) while sendStatsUpdate uses insertedMessagesInBatch (batch count). This is by design - logs show total progress for human readability while stats updates track batch progress for system functionality.

Applied to files:

  • server/workflow-tools/gmail.ts
📚 Learning: 2025-07-10T08:02:41.059Z
Learnt from: naSim087
Repo: xynehq/xyne PR: 651
File: server/tests/extractEmailHeadersVal.test.ts:49-58
Timestamp: 2025-07-10T08:02:41.059Z
Learning: The test file `server/tests/extractEmailHeadersVal.test.ts` is primarily designed to verify email extraction from headers like to, from, cc, and bcc. Message-ID related functionality refactoring is planned for a future PR.

Applied to files:

  • server/workflow-tools/email.ts
🧬 Code graph analysis (32)
server/workflow-tools/types.ts (2)
server/db/schema/workflows.ts (4)
  • ToolExecutionStatus (480-480)
  • ToolType (478-478)
  • ToolCategory (479-479)
  • SelectWorkflowTemplate (251-253)
server/api/workflow-template.ts (1)
  • workflowToolType (282-282)
server/execution-engine/triggers.ts (7)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/communication-service.ts (1)
  • handleManualTrigger (156-178)
server/db/schema/workflows.ts (8)
  • workflowStepExecution (178-204)
  • WorkflowStatus (476-476)
  • toolExecution (207-225)
  • ToolExecutionStatus (480-480)
  • SelectWorkflowTemplate (251-253)
  • workflowStepTemplate (101-124)
  • workflowTool (127-147)
  • ToolType (478-478)
server/execution-engine/types.ts (1)
  • ExecutionPacket (70-78)
server/execution-engine/execution-engine-queue.ts (1)
  • queueNextSteps (150-298)
server/workflow-tools/registry.ts (1)
  • getTool (39-45)
server/workflow-tools/types.ts (1)
  • WorkflowTool (23-30)
server/execution-engine/types.ts (1)
server/workflow-tools/types.ts (1)
  • ToolExecutionResult (13-18)
server/workflow-tools/jira.ts (2)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/db/schema/workflows.ts (3)
  • ToolType (478-478)
  • ToolCategory (479-479)
  • ToolExecutionStatus (480-480)
server/workflow-tools/ai-agent.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/execution-engine/communication-service.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/message-queue.ts (4)
  • ExecutionMessage (10-15)
  • ExecutionResponse (24-29)
  • messageQueue (276-276)
  • ExecutionRequest (17-22)
server/execution-engine/workflow-executor.ts (1)
  • workflowExecutor (359-359)
server/execution-engine/triggers.ts (1)
  • handleManualTrigger (18-128)
server/execution-engine/message-queue.ts (2)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/index.ts (1)
  • messageQueue (6-6)
server/api/workflow-execution.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/db/user.ts (1)
  • getUserFromJWT (488-500)
server/execution-engine/execution-client.ts (1)
  • executionClient (137-137)
server/db/schema/workflows.ts (6)
  • workflowExecution (150-175)
  • workflowTemplate (71-98)
  • workflowStepExecution (178-204)
  • workflowStepTemplate (101-124)
  • toolExecution (207-225)
  • workflowTool (127-147)
server/types/workflowTypes.ts (1)
server/db/schema/workflows.ts (2)
  • ToolCategory (479-479)
  • ToolExecutionStatus (480-480)
server/execution-engine/execution-engine-queue.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/types.ts (2)
  • ExecutionPacket (70-78)
  • StepExecutionResult (81-89)
server/execution-engine/step-executor.ts (1)
  • stepExecutor (555-555)
server/db/schema/workflows.ts (4)
  • workflowExecution (150-175)
  • WorkflowStatus (476-476)
  • workflowStepExecution (178-204)
  • workflowStepTemplate (101-124)
server/workflow-tools/agent.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/workflow-tools/http-request.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/workflow-tools/switch.ts (2)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/workflow-tools/types.ts (4)
  • WorkflowTool (23-30)
  • defaultToolConfig (20-20)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/execution-engine/index.ts (3)
server/execution-engine/execution-client.ts (1)
  • executionClient (137-137)
server/execution-engine/workflow-executor.ts (1)
  • workflowExecutor (359-359)
server/execution-engine/communication-service.ts (1)
  • communicationService (217-217)
server/workflow-tools/delay.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/execution-engine/execution-client.ts (2)
server/execution-engine/message-queue.ts (2)
  • ExecutionRequest (17-22)
  • messageQueue (276-276)
server/queue/boss.ts (1)
  • boss (4-7)
server/workflow-tools/slack.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/workflow-tools/registry.ts (2)
server/workflow-tools/types.ts (1)
  • ToolRegistry (33-35)
server/workflow-tools/email.ts (1)
  • EmailTool (5-111)
server/workflow-tools/webhook.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/workflow-tools/merged-node.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/workflow-tools/manual-trigger.ts (1)
server/workflow-tools/types.ts (4)
  • WorkflowTool (23-30)
  • defaultToolConfig (20-20)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/api/workflow-template-step.ts (5)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/db/schema/workflows.ts (4)
  • ToolType (478-478)
  • workflowTemplate (71-98)
  • workflowTool (127-147)
  • workflowStepTemplate (101-124)
server/api/workflow-template.ts (1)
  • workflowToolConfigSchema (184-194)
server/db/user.ts (1)
  • getUserFromJWT (488-500)
server/workflow-tools/registry.ts (1)
  • getToolCategory (59-62)
server/execution-engine/step-executor.ts (5)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/types.ts (2)
  • ExecutionPacket (70-78)
  • StepExecutionResult (81-89)
server/workflow-tools/registry.ts (1)
  • getTool (39-45)
server/db/schema/workflows.ts (5)
  • ToolExecutionStatus (480-480)
  • WorkflowStatus (476-476)
  • workflowTool (127-147)
  • workflowStepExecution (178-204)
  • toolExecution (207-225)
server/workflow-tools/types.ts (2)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/workflow-tools/scheduler-trigger.ts (3)
server/workflow-tools/types.ts (4)
  • WorkflowTool (23-30)
  • defaultToolConfig (20-20)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/db/schema/workflows.ts (4)
  • ToolType (478-478)
  • ToolCategory (479-479)
  • ToolExecutionStatus (480-480)
  • SelectWorkflowTemplate (251-253)
server/execution-engine/message-queue.ts (2)
  • ExecutionRequest (17-22)
  • messageQueue (276-276)
server/api/workflow-template.ts (5)
server/logger/index.ts (3)
  • getLoggerWithChild (192-200)
  • Subsystem (15-15)
  • getLogger (36-93)
server/workflow-tools/registry.ts (3)
  • getToolCategory (59-62)
  • getSupportedToolTypes (53-55)
  • getTool (39-45)
server/db/schema/workflows.ts (5)
  • ToolType (478-478)
  • workflowTemplate (71-98)
  • workflowTool (127-147)
  • workflowStepTemplate (101-124)
  • SelectWorkflowTemplate (251-253)
server/db/user.ts (1)
  • getUserFromJWT (488-500)
server/execution-engine/triggers.ts (1)
  • handleTemplateStateChange (131-226)
server/execution-engine/server.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/message-queue.ts (1)
  • messageQueue (276-276)
server/execution-engine/execution-engine-queue.ts (1)
  • initExecutionEngineQueue (29-39)
server/execution-engine/communication-service.ts (1)
  • communicationService (217-217)
server/workflow-tools/gmail.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/server.ts (4)
server/api/workflow-template.ts (8)
  • GetAvailableToolTypesApi (126-153)
  • GetToolTypeSchemaApi (156-182)
  • createTemplateSchema (212-219)
  • CreateTemplateApi (285-491)
  • GetTemplateApi (566-667)
  • UpdateTemplateApi (494-563)
  • DeleteTemplateApi (670-730)
  • HandleStateChangeTemplateApi (866-929)
server/api/workflow-template-step.ts (8)
  • addTemplateStepSchema (26-47)
  • AddTemplateStepApiHandler (89-259)
  • updateTemplateStepSchema (49-70)
  • UpdateTemplateStepApiHandler (262-485)
  • deleteTemplateStepSchema (72-74)
  • DeleteTemplateStepApiHandler (487-640)
  • deleteLinkSchema (76-86)
  • deleteLink (643-782)
server/api/workflow-execution.ts (6)
  • ExecuteTemplateHandler (24-53)
  • GetExecutionStatusApi (56-79)
  • GetWorkflowExecutionDetailsApi (164-359)
  • StopExecutionApi (82-108)
  • GetEngineHealthApi (111-132)
  • HandleManualTrigger (135-161)
server/execution-engine/message-queue.ts (1)
  • messageQueue (276-276)
server/workflow-tools/email.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/execution-engine/workflow-executor.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/db/schema/workflows.ts (7)
  • workflowExecution (150-175)
  • WorkflowStatus (476-476)
  • workflowTemplate (71-98)
  • workflowStepTemplate (101-124)
  • workflowStepExecution (178-204)
  • workflowTool (127-147)
  • ToolCategory (479-479)
server/execution-engine/types.ts (1)
  • ExecutionPacket (70-78)
server/execution-engine/execution-engine-queue.ts (1)
  • sendExecutionPacket (91-116)
server/workflow-tools/form.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-10)
  • ToolExecutionResult (13-18)
server/queue/index.ts (2)
server/queue/api-server-queue.ts (1)
  • boss (9-12)
server/queue/boss.ts (1)
  • boss (4-7)
🔇 Additional comments (10)
server/config.ts (1)

10-10: LGTM!

The execution engine configuration follows the existing pattern for other services (sync server) and provides appropriate defaults. The centralized configuration approach enables proper separation of concerns for the new execution engine subsystem.

Also applies to: 15-15, 280-281

server/workflow-tools/delay.ts (1)

21-101: LGTM on overall implementation.

The DelayTool has well-defined schemas and proper error handling. The unit conversion logic is clear and the implementation correctly handles input overrides.

server/workflow-tools/switch.ts (1)

10-193: Well-implemented conditional routing logic.

The SwitchTool demonstrates good practices with comprehensive logging, proper error handling, multiple evaluation modes, and clear helper methods for value extraction and comparison.

server/workflow-tools/jira.ts (1)

23-27: Ensure API tokens are not logged or exposed.

The apiToken field contains sensitive credentials. When this tool is implemented, verify that:

  1. API tokens are never logged (even in error messages)
  2. Tokens are stored securely (encrypted at rest)
  3. Tokens are transmitted only over HTTPS
  4. Consider using OAuth instead of API tokens if possible
server/workflow-tools/registry.ts (1)

20-36: Registry wiring looks solid.
The consolidated registry neatly instantiates the tool catalog, and the helper exports keep the API consistent.

server/execution-engine/types.ts (1)

69-89: Type definitions line up with the queue contract.
ExecutionPacket and StepExecutionResult capture the runtime shape used across the engine; no issues spotted.

server/server.ts (4)

337-358: LGTM: Workflow API imports are properly structured.

The imports for workflow template, template step, and execution APIs are well-organized and all entities are used in the route definitions below.


1583-1583: LGTM: Manual trigger endpoint is properly secured.

The manual trigger route is correctly placed after AuthMiddleware and follows the expected pattern for triggering workflow steps manually.

Based on learnings.


2426-2429: LGTM: Message queue initialization is properly implemented.

The message queue is initialized with:

  • Dynamic import for lazy loading
  • Proper awaiting of the initialization call
  • Clear commenting explaining the inter-service communication purpose
  • Appropriate error handling via the init().catch() block at line 2463

1537-1573: Route pattern inconsistency confirmed between old and new workflow endpoints.

The codebase has conflicting URL patterns for workflow resources:

  • Templates: /workflow/templates/:templateId (lines 1485-1487, 1493, plural) vs. /workflow/template/:templateId (lines 1544-1546, singular)
  • Executions: /workflow/executions/:executionId (line 1513, plural) vs. /workflow/execution/:executionId/* (lines 1570-1572, singular)
  • API handler mismatch: Internal code at server/api/workflow.ts:792 generates URLs using plural form (/api/v1/workflow/executions/${execution.id}/status), which conflicts with the new singular endpoint pattern

This duplication creates API confusion, maintenance complexity, and potential routing conflicts. Standardize all workflow endpoints to consistently use either plural or singular resource names.

⛔ Skipped due to learnings
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: server/db/schema/workflows.ts:320-320
Timestamp: 2025-10-28T07:51:03.144Z
Learning: In the workflow template architecture, the API layer handles conversion between external UUIDs (used by frontend) and internal integer IDs (used in database). Frontend types expect UUID strings for workflow template IDs, while internal schemas like createWorkflowStepTemplateSchema expect integers. The publicWorkflowTemplateSchema maps external_id → id for API responses.

Comment on lines +135 to +152
export const HandleManualTrigger = async (c: Context) => {
const { workflowId, stepId } = c.req.param()

try {
Logger.info(`🔴 Manual trigger requested for workflow ${workflowId}, step ${stepId}`)

// Use execution client to trigger the step (same pattern as ExecuteTemplateHandler)
const result = await executionClient.triggerManualStep(workflowId, stepId, "manual")

Logger.info(`✅ Manual trigger processed for step ${stepId}`)

return c.json({
success: true,
message: "Manual trigger completed successfully",
stepId: stepId,
workflowId: workflowId,
result: result
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Enforce authentication before manual triggers

This handler never validates the caller’s JWT, so anyone who can hit the route can manually fire workflow steps—an authorization bypass that contradicts our requirement that all workflow endpoints be protected. Please authenticate first (even if the return value isn’t used) so unauthorized requests fail immediately, e.g.:

 export const HandleManualTrigger = async (c: Context) => {
   const { workflowId, stepId } = c.req.param()
 
   try {
+    await getUserFromJWT(db, c.get(JwtPayloadKey))
     Logger.info(`🔴 Manual trigger requested for workflow ${workflowId}, step ${stepId}`)

Based on learnings.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In server/api/workflow-execution.ts around lines 135 to 152, the handler
triggers manual workflow steps without authenticating the caller; add an
explicit authentication check at the top of the handler (await the
JWT/authorization validation for the incoming request and short-circuit with a
401/403 response on failure) so unauthorized callers cannot proceed, ensure the
auth call is awaited before calling executionClient.triggerManualStep, and keep
the existing logging and success response only after authentication succeeds.

Comment on lines 803 to 828
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix root-node detection in template validation

validateTemplateLogic builds workflowNodeNames from node.name but compares them to connection UUIDs (source/target). Because the identifiers never match, every node is treated as “root”, so the trigger-at-start rule is silently skipped. Invalid templates (no trigger on actual entry points) now pass and will fail later at execution time. Please base the root detection on the node UUIDs returned by extractUUIDFromArray instead of the human-readable names.

Update the logic to track UUIDs throughout the DFS and root checks so only genuine start nodes are accepted.

import { workflowExecutor } from "./workflow-executor"
import { handleManualTrigger } from "./triggers"

const Logger = getLogger(Subsystem.WorkflowApi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incorrect logger subsystem.

The logger uses Subsystem.WorkflowApi but this is the execution engine communication service. Based on line 9 in server/execution-engine/server.ts, the correct subsystem should be Subsystem.ExecutionEngine.

Apply this diff:

-const Logger = getLogger(Subsystem.WorkflowApi)
+const Logger = getLogger(Subsystem.ExecutionEngine)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const Logger = getLogger(Subsystem.WorkflowApi)
const Logger = getLogger(Subsystem.ExecutionEngine)
🤖 Prompt for AI Agents
In server/execution-engine/communication-service.ts around line 7 the logger is
created with the wrong subsystem (Subsystem.WorkflowApi); change it to use
Subsystem.ExecutionEngine so logs are correctly attributed to the execution
engine communication service. Update the getLogger call to pass
Subsystem.ExecutionEngine and ensure the Subsystem import (if present) is not
accidentally altered.

Comment on lines +156 to +178
private async handleManualTrigger(request: { workflowId: string, stepId: string, triggeredBy: string }): Promise<ExecutionResponse> {
try {
// Create a mock context object with the required parameters
const mockContext = {
req: {
param: () => ({ workflowId: request.workflowId, stepId: request.stepId })
},
json: (data: any) => data
} as any

const result = await handleManualTrigger(mockContext)

return {
success: true,
data: result
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : "Failed to trigger manual step"
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Brittle mock context bypasses type safety.

Lines 158-164 create a mock context object with as any to call handleManualTrigger, which expects a Hono Context. This is fragile and bypasses TypeScript's type checking.

Consider refactoring handleManualTrigger in server/execution-engine/triggers.ts to extract the business logic into a separate function that doesn't depend on the Hono Context:

// In triggers.ts - add new function
export const triggerManualStep = async (
  workflowId: string, 
  stepId: string, 
  triggeredBy: string
) => {
  // Move lines 17-106 from handleManualTrigger here
  // Return the result directly without c.json()
}

// Then handleManualTrigger becomes:
export const handleManualTrigger = async (c: Context) => {
  const { workflowId, stepId } = c.req.param()
  try {
    const result = await triggerManualStep(workflowId, stepId, "manual")
    return c.json(result)
  } catch (error) {
    // error handling
  }
}

Then in communication-service.ts:

 private async handleManualTrigger(request: { workflowId: string, stepId: string, triggeredBy: string }): Promise<ExecutionResponse> {
   try {
-    // Create a mock context object with the required parameters
-    const mockContext = {
-      req: {
-        param: () => ({ workflowId: request.workflowId, stepId: request.stepId })
-      },
-      json: (data: any) => data
-    } as any
-
-    const result = await handleManualTrigger(mockContext)
+    const result = await triggerManualStep(
+      request.workflowId, 
+      request.stepId, 
+      request.triggeredBy
+    )
     
     return {
       success: true,
       data: result
     }
🤖 Prompt for AI Agents
In server/execution-engine/communication-service.ts around lines 156 to 178, the
code builds a brittle mock Hono Context with "as any" to call
handleManualTrigger; instead extract the business logic out of
handleManualTrigger into a new exported, context-free function (e.g.,
triggerManualStep(workflowId: string, stepId: string, triggeredBy: string):
Promise<YourResultType>) in server/execution-engine/triggers.ts, refactor
handleManualTrigger to parse params from the Hono Context and call
triggerManualStep then return c.json(result), and update communication-service
to call the new triggerManualStep directly (passing request.workflowId,
request.stepId, request.triggeredBy) and handle its typed result/error—this
removes the mock context, preserves TypeScript safety, and requires updating
imports and types accordingly.

Comment on lines +67 to +79
async execute(
input: Record<string, any>,
config: Record<string, any>,
workflowContext: WorkflowContext
): Promise<ToolExecutionResult> {
return {
status: ToolExecutionStatus.COMPLETED,
output: {
message: "Gmail task completed",
data: input,
},
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align execute output with the declared schema.
outputSchema (Lines 48-55) requires to, subject, body, sentAt, and status, but the implementation on Lines 73-77 only returns { message, data }. Any downstream validator consuming the schema will break. Please populate the required fields from config/input and include a sentAt timestamp and status string.

Apply this patch:

-  async execute(
-    input: Record<string, any>,
-    config: Record<string, any>,
-    workflowContext: WorkflowContext
-  ): Promise<ToolExecutionResult> {
-    return {
-      status: ToolExecutionStatus.COMPLETED,
-      output: {
-        message: "Gmail task completed",
-        data: input,
-      },
-    }
-  }
+  async execute(
+    input: Record<string, any>,
+    config: Record<string, any>,
+    workflowContext: WorkflowContext
+  ): Promise<ToolExecutionResult> {
+    const to = config.to ?? []
+    const subject = input.subject ?? ""
+    const body = input.body ?? ""
+
+    return {
+      status: ToolExecutionStatus.COMPLETED,
+      output: {
+        messageId: workflowContext.currentToolId,
+        to,
+        subject,
+        body,
+        sentAt: new Date().toISOString(),
+        status: "sent",
+        attachments: input.attachments ?? [],
+        message: "Gmail task completed",
+      },
+    }
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async execute(
input: Record<string, any>,
config: Record<string, any>,
workflowContext: WorkflowContext
): Promise<ToolExecutionResult> {
return {
status: ToolExecutionStatus.COMPLETED,
output: {
message: "Gmail task completed",
data: input,
},
}
}
async execute(
input: Record<string, any>,
config: Record<string, any>,
workflowContext: WorkflowContext
): Promise<ToolExecutionResult> {
const to = config.to ?? []
const subject = input.subject ?? ""
const body = input.body ?? ""
return {
status: ToolExecutionStatus.COMPLETED,
output: {
messageId: workflowContext.currentToolId,
to,
subject,
body,
sentAt: new Date().toISOString(),
status: "sent",
attachments: input.attachments ?? [],
message: "Gmail task completed",
},
}
}
🤖 Prompt for AI Agents
In server/workflow-tools/gmail.ts around lines 67 to 79, the execute() return
value currently returns {message, data} but must match the declared outputSchema
(to, subject, body, sentAt, status); update the returned output to populate
to/subject/body from config or input (prefer config if provided, fall back to
input), set sentAt to the current ISO timestamp (new Date().toISOString()),
include a status string such as "sent" (or "failed" on error), and retain any
helpful message or extra data under a separate field; ensure the final shape
includes keys: to, subject, body, sentAt, status (and optional message/data) so
downstream validation succeeds.

Comment on lines +52 to +75
async execute(
input: Record<string, any>,
config: Record<string, any>,
workflowContext: WorkflowContext
): Promise<ToolExecutionResult> {
try {
// TODO: Implement HTTP request execution logic
return {
status: ToolExecutionStatus.COMPLETED,
output: {
status: 200,
data: { message: "HTTP request executed successfully" },
duration: 100,
}
}
} catch (error) {
return {
status: ToolExecutionStatus.FAILED,
output: {
error: error instanceof Error ? error.message : "Unknown error"
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stub implementation with critical security implications.

The TODO indicates this is a placeholder, but when implemented, this tool will need SSRF (Server-Side Request Forgery) protection to prevent requests to internal/private networks.

Before implementing the HTTP request logic, ensure you:

  1. Validate and sanitize the URL to block private IP ranges (127.0.0.0/8, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, etc.)
  2. Implement DNS rebinding protection
  3. Restrict allowed protocols (http/https only)
  4. Apply timeout and size limits
  5. Consider using an allowlist for permitted domains

Would you like me to generate a secure implementation or create a separate issue for this?

🤖 Prompt for AI Agents
In server/workflow-tools/http-request.ts around lines 52 to 75, the execute
method is a stub that lacks any SSRF protections; replace the placeholder with a
secure HTTP request implementation that (1) strictly validates and normalizes
the input URL and rejects non-http(s) schemes, (2) resolves the hostname and
blocks requests to private/internal IP ranges (127.0.0.0/8, 10.0.0.0/8,
172.16.0.0/12, 192.168.0.0/16, link-local, and IPv6 equivalents), (3) defends
against DNS rebinding by resolving DNS separately and ensuring the resolved IP
remains allowed for the entire request, (4) enforces timeouts and response size
limits and parses/validates headers, and (5) optionally enforces an allowlist of
domains and returns clear error states via ToolExecutionStatus.FAILED when
checks fail; implement these checks before performing the network call and
ensure all failures surface a safe error message.

Comment on lines +37 to +39
inputSchema = z.object({})
outputSchema = z.object({})
configSchema = z.object({})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Empty schemas provide no validation.

All schemas are empty z.object({}), which means no runtime validation. This is particularly important for a merge node that needs to handle multiple inputs.

Consider defining schemas that reflect the merge behavior:

-  inputSchema = z.object({})
-  outputSchema = z.object({})
-  configSchema = z.object({})
+  inputSchema = z.object({
+    // Schema should accommodate multiple input shapes
+  })
+
+  outputSchema = z.object({
+    merged: z.record(z.string(), z.any()),
+    sources: z.array(z.string()).optional()
+  })
+
+  configSchema = z.object({
+    inputCount: z.number().min(1).max(5).default(1),
+    mergeStrategy: z.enum(["merge", "concat", "latest"]).default("merge"),
+    waitForAll: z.boolean().default(true),
+    timeout: z.number().positive().default(300000)
+  })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inputSchema = z.object({})
outputSchema = z.object({})
configSchema = z.object({})
inputSchema = z.object({
// Schema should accommodate multiple input shapes
})
outputSchema = z.object({
merged: z.record(z.string(), z.any()),
sources: z.array(z.string()).optional()
})
configSchema = z.object({
inputCount: z.number().min(1).max(5).default(1),
mergeStrategy: z.enum(["merge", "concat", "latest"]).default("merge"),
waitForAll: z.boolean().default(true),
timeout: z.number().positive().default(300000)
})
🤖 Prompt for AI Agents
In server/workflow-tools/merged-node.ts around lines 37 to 39 the three schemas
are empty (inputSchema = z.object({}), outputSchema = z.object({}), configSchema
= z.object({})), which provides no runtime validation; replace them with
concrete schemas that reflect the merge node behavior — for inputSchema define
the expected shape for multiple upstream payloads (e.g., a record keyed by input
name or an array of objects with required fields) and validate types for the
data you will merge, for outputSchema define the merged result shape (required
keys and value types) and any post-merge invariants, and for configSchema define
merge strategy options (e.g., conflictResolution: z.enum([...]), keysToKeep:
z.array(z.string()).optional()) ; update any derived TypeScript types to use
z.infer<typeof ...>, add minimal unit tests for valid/invalid inputs and ensure
calling code adapts to the new schema errors (throw/return) so the node fails
fast on invalid data.

Comment on lines +9 to +34
defaultConfig = {
inputCount: 1,
outputCount: 1,
options: {
channel: {
type: "string",
default: "",
optional: false
},
username: {
type: "string",
default: "Workflow Bot",
optional: true
},
iconEmoji: {
type: "string",
default: ":robot_face:",
optional: true
},
threadTs: {
type: "string",
default: "",
optional: true
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing authentication configuration.

The Slack tool lacks authentication fields (bot token or webhook URL) in its defaultConfig. Without these, the tool cannot connect to Slack when implemented.

Add authentication to the defaultConfig:

   defaultConfig = {
     inputCount: 1,
     outputCount: 1,
     options: {
+      token: {
+        type: "string",
+        default: "",
+        optional: false
+      },
+      // OR for webhook-based approach:
+      // webhookUrl: {
+      //   type: "string",
+      //   default: "",
+      //   optional: false
+      // },
       channel: {
         type: "string",
         default: "",
         optional: false
       },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
defaultConfig = {
inputCount: 1,
outputCount: 1,
options: {
channel: {
type: "string",
default: "",
optional: false
},
username: {
type: "string",
default: "Workflow Bot",
optional: true
},
iconEmoji: {
type: "string",
default: ":robot_face:",
optional: true
},
threadTs: {
type: "string",
default: "",
optional: true
}
}
}
defaultConfig = {
inputCount: 1,
outputCount: 1,
options: {
token: {
type: "string",
default: "",
optional: false
},
// OR for webhook-based approach:
// webhookUrl: {
// type: "string",
// default: "",
// optional: false
// },
channel: {
type: "string",
default: "",
optional: false
},
username: {
type: "string",
default: "Workflow Bot",
optional: true
},
iconEmoji: {
type: "string",
default: ":robot_face:",
optional: true
},
threadTs: {
type: "string",
default: "",
optional: true
}
}
}
🤖 Prompt for AI Agents
In server/workflow-tools/slack.ts around lines 9 to 34, the defaultConfig is
missing Slack authentication fields; add options for botToken and webhookUrl
(both type "string", default "", optional true) to defaultConfig, document that
botToken takes precedence over webhookUrl, and update any validation to error at
runtime if neither is provided so callers must supply at least one; do not
hardcode secrets—prefer reading from environment when omitted and mark these
fields as sensitive in any config metadata.

}

// Get template with access validation
private async getTemplate(templateId: string, workspaceId: number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need all this db helper functions like getTemplate createWorkflowExecution since these are already defined in the /db/workflow.ts file? Also this getTemplate is supposed to do access validation but its not checking for user access to the template

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will connect with you on how this access validation is working, i need to implement that

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (11)
server/execution-engine/triggers.ts (2)

45-55: Scope the pending tool lookup to this step.

Line 50 still filters only by workflowExecutionId and status, so any other WAITING tool execution in the same workflow can be marked COMPLETE, dequeuing the wrong branch. Please guard the lookup with this step’s toolExecIds, and short-circuit when the list is empty.

+    const toolExecIds = step.toolExecIds ?? []
+    if (toolExecIds.length === 0) {
+      Logger.warn(`Step ${stepId} has no pending tool executions`)
+      return c.json({
+        error: "No pending tool execution found"
+      }, 404)
+    }
+
     const [toolExec] = await db
       .select()
       .from(toolExecution)
       .where(
         and(
           eq(toolExecution.workflowExecutionId, workflowId),
-          eq(toolExecution.status, ToolExecutionStatus.AWAITING_USER_INPUT)
+          eq(toolExecution.status, ToolExecutionStatus.AWAITING_USER_INPUT),
+          sql`${toolExecution.id} = ANY(${sql.array(toolExecIds, "uuid")})`
         )
       )
       .limit(1)

135-147: Restrict template triggers to root steps again.

Lines 139-143 now rely solely on toolCategory, so non-root trigger steps fire on every state change. That revives the bug we discussed earlier—downstream triggers become active without their predecessors. Please restore the prevStepIds null/empty filter.

     const triggerSteps = await db
       .select()
       .from(workflowStepTemplate)
       .where(
         and(
           eq(workflowStepTemplate.workflowTemplateId, template.id),
-          eq(workflowStepTemplate.toolCategory, ToolCategory.TRIGGER),
+          eq(workflowStepTemplate.toolCategory, ToolCategory.TRIGGER),
+          or(
+            sql`${workflowStepTemplate.prevStepIds} IS NULL`,
+            sql`array_length(${workflowStepTemplate.prevStepIds}, 1) = 0`
+          )
         )
       )
server/api/workflow-execution.ts (1)

115-133: Authenticate manual trigger requests.

Line 120 logs the manual trigger without validating the caller, leaving this endpoint open to anyone who can hit it. We require every workflow API to authenticate first—please await getUserFromJWT (and fail fast on errors) before dispatching the manual trigger. Based on learnings

 export const HandleManualTrigger = async (c: Context) => {
   const { workflowId, stepId } = c.req.param()
   
   try {
+    await getUserFromJWT(db, c.get(JwtPayloadKey))
     Logger.info(`🔴 Manual trigger requested for workflow ${workflowId}, step ${stepId}`)
 
     // Use execution client to trigger the step (same pattern as ExecuteTemplateHandler)
     const result = await executionClient.triggerManualStep(workflowId, stepId, "manual")
server/execution-engine/execution-engine-queue.ts (1)

48-83: Handle single pg-boss job per invocation.

pg-boss.work delivers a single job, so iterating jobs throws TypeError: job is not iterable and the worker never runs. Replace the loop with direct single-job handling.

-  await executionBoss.work(ExecutionQueue, async (jobs) => {
-    for (const job of jobs) {
-      const packet = job.data as ExecutionPacket
-      try {
+  await executionBoss.work(ExecutionQueue, async (job) => {
+    const packet = job.data as ExecutionPacket
+    try {
         Logger.info(`🔄 EXECUTION WORKER PICKED UP PACKET:`)
         Logger.info(`   Template ID: ${packet.template_id}`)
         Logger.info(`   Workflow ID: ${packet.workflow_id}`)
         Logger.info(`   Step ID: ${packet.step_id}`)
         Logger.info(`   Previous Tool ID: ${packet.previous_tool_id}`)
         Logger.info(`   Input: ${JSON.stringify(packet.input)}`)
         Logger.info(`   Job ID: ${job.id}`)
         
         // Execute the step using StepExecutor
         const result = await stepExecutor.executeStep(packet)
         
         // Log execution result
         Logger.info(`📋 Step execution result: ${JSON.stringify(result)}`)
 
         // Queue next steps if execution should continue
         if (result.nextAction === 'continue') {
           await queueNextSteps(packet, result)
         } else if (result.nextAction === 'wait_for_input') {
           Logger.info(`⏳ Step is waiting for additional inputs, not queuing next steps`)
         }
         
         Logger.info(`✅ Packet processing completed for job ${job.id}`)
         
-      } catch (error) {
-        Logger.error(error, `❌ Error processing execution packet for job ${job.id}`)
-        markWorkflowStatus(packet.workflow_id,false).catch((err) => {
-          Logger.error(err, `Failed to mark workflow ${packet.workflow_id} as FAILED after execution error`)
-        })
-      }
-    }
+    } catch (error) {
+      Logger.error(error, `❌ Error processing execution packet for job ${job.id}`)
+      markWorkflowStatus(packet.workflow_id,false).catch((err) => {
+        Logger.error(err, `Failed to mark workflow ${packet.workflow_id} as FAILED after execution error`)
+      })
+    }
   })
server/execution-engine/communication-service.ts (2)

7-7: Incorrect logger subsystem (previously flagged).

The logger is initialized with Subsystem.WorkflowApi, but this is the execution engine communication service. Based on the module's purpose and sibling files (e.g., server/execution-engine/server.ts uses Subsystem.ExecutionEngine), the correct subsystem should be Subsystem.ExecutionEngine.

Apply this diff:

-const Logger = getLogger(Subsystem.WorkflowApi)
+const Logger = getLogger(Subsystem.ExecutionEngine)

Based on past review comments.


135-157: Brittle mock context bypasses type safety (previously flagged).

Lines 138-143 create a mock context object with as any to call handleManualTrigger, which expects a Hono Context. This is fragile and bypasses TypeScript's type checking. The recommended approach from previous reviews is to extract the business logic from handleManualTrigger into a separate context-free function that can be called directly by both this service and the Hono API handler.

Consider refactoring as suggested in previous reviews: extract a triggerManualStep(workflowId, stepId, triggeredBy) function in server/execution-engine/triggers.ts that contains the core logic, then call it directly without the mock context.

Based on past review comments.

server/execution-engine/workflow-executor.ts (2)

224-232: Missing parent step linkage in execution graph (previously flagged).

createStepExecutions updates prevStepIds and nextStepIds but never sets parentStepId on the created step execution rows. This means the parent-child hierarchy from the template is not preserved in the execution records. Any downstream logic that relies on the execution graph parent relationships (e.g., traversing up the tree, computing breadcrumbs) will lose the parent linkage even though the template modeled it.

Apply this diff to map template parent IDs to execution IDs:

       // Update the step execution with relationships and metadata
+      const parentStepExecId =
+        templateStep.parentStepId
+          ? stepExecutions.get(templateStep.parentStepId) ?? null
+          : null
+
       await db
         .update(workflowStepExecution)
         .set({
+          parentStepId: parentStepExecId,
           prevStepIds: prevStepExecIds,
           nextStepIds: nextStepExecIds,
           metadata: updatedMetadata,
         })
         .where(eq(workflowStepExecution.id, stepExecId))

Based on past review comments.


73-86: Incomplete access validation in getTemplate.

The getTemplate method checks that the template belongs to the correct workspace (line 80) but doesn't verify user-specific permissions. Per past review comments, this method is supposed to perform access validation, but it's not checking whether the requesting user has permission to access the template (via userWorkflowPermissions or isPublic checks).

Consider using the existing getWorkflowTemplateByIdWithPermissionCheck helper from server/db/workflow.ts (lines 67-108 of relevant snippets) instead of this custom query, or implement similar permission logic here.

Based on past review comments.

server/execution-engine/step-executor.ts (2)

108-111: RUNNING steps incorrectly marked as COMPLETED (previously flagged).

When a tool reports ToolExecutionStatus.RUNNING, the code immediately returns nextAction: 'continue' and persists the step as WorkflowStatus.COMPLETED (line 110). This prematurely advances the workflow while the tool is still executing, guaranteeing downstream steps consume incomplete data and making it impossible to resume correctly.

Apply this diff to keep the step in a non-terminal state:

       case ToolExecutionStatus.RUNNING:
-        nextAction = 'continue'
-        stepStatus = WorkflowStatus.COMPLETED
+        nextAction = 'wait_for_input'
+        stepStatus = WorkflowStatus.ACTIVE
         break

Based on past review comments.


397-426: Partial inputs not cleared after combining results (previously flagged).

Once all required inputs arrive and combinedInput is built (lines 398-419), the code never clears partialInputs from metadata. On the next execution cycle, those stale entries make the step believe every route is already satisfied, causing it to reuse old tool execution IDs and feed last-run outputs forward. This corrupts multi-input workflows after the first run.

After building combinedInput and before returning shouldExecute: true, reset the consumed partial input state within the same transaction. Update the step metadata to clear partialInputs to an empty object.

Based on past review comments.

server/api/workflow-template.ts (1)

287-297: Fragile connection reconstruction using non-unique names (previously flagged).

Reconstructing connections using step names (e.g., ${step.name}-${nextStep.name} on line 293) is fragile because step names are not guaranteed to be unique within a workflow template. If two steps have the same name, this logic will lead to incorrect connections being created or ambiguous connection identification.

Use step IDs instead, which are guaranteed to be unique. The step already has nextStepIds array with UUIDs, so you could reconstruct connections directly from those IDs without name-based lookups.

Based on past review comments.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3705c9b and 1ab95b7.

📒 Files selected for processing (25)
  • server/api/workflow-execution.ts (1 hunks)
  • server/api/workflow-template-step.ts (1 hunks)
  • server/api/workflow-template.ts (1 hunks)
  • server/db/schema/workflows.ts (8 hunks)
  • server/db/workflow.ts (2 hunks)
  • server/execution-engine/communication-service.ts (1 hunks)
  • server/execution-engine/execution-client.ts (1 hunks)
  • server/execution-engine/execution-engine-queue.ts (1 hunks)
  • server/execution-engine/message-queue.ts (1 hunks)
  • server/execution-engine/step-executor.ts (1 hunks)
  • server/execution-engine/triggers.ts (1 hunks)
  • server/execution-engine/types.ts (1 hunks)
  • server/execution-engine/workflow-executor.ts (1 hunks)
  • server/package.json (2 hunks)
  • server/server.ts (4 hunks)
  • server/workflow-tools/agent.ts (1 hunks)
  • server/workflow-tools/ai-agent.ts (1 hunks)
  • server/workflow-tools/delay.ts (1 hunks)
  • server/workflow-tools/manual-trigger.ts (1 hunks)
  • server/workflow-tools/merged-node.ts (1 hunks)
  • server/workflow-tools/registry.ts (1 hunks)
  • server/workflow-tools/scheduler-trigger.ts (1 hunks)
  • server/workflow-tools/slack.ts (1 hunks)
  • server/workflow-tools/switch.ts (1 hunks)
  • server/workflow-tools/types.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
  • server/workflow-tools/agent.ts
  • server/workflow-tools/slack.ts
  • server/workflow-tools/ai-agent.ts
  • server/workflow-tools/merged-node.ts
  • server/api/workflow-template-step.ts
  • server/workflow-tools/switch.ts
  • server/workflow-tools/manual-trigger.ts
  • server/workflow-tools/types.ts
  • server/package.json
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-10-17T08:54:59.756Z
Learnt from: debajyoti0606
Repo: xynehq/xyne PR: 1075
File: server/workflowScriptExecutorTool.ts:36-77
Timestamp: 2025-10-17T08:54:59.756Z
Learning: In server/workflowScriptExecutorTool.ts, the validateJson function's escape sequence normalization logic (lines 41-52 that replace \n, \r, \t, \f, \b, \", and \\ before parsing) is intentionally designed to handle edge cases and should not be flagged as potentially corrupting valid JSON. The double-escaping behavior is expected.

Applied to files:

  • server/api/workflow-execution.ts
📚 Learning: 2025-09-12T13:28:43.782Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.

Applied to files:

  • server/api/workflow-execution.ts
  • server/api/workflow-template.ts
  • server/server.ts
📚 Learning: 2025-06-16T11:56:22.752Z
Learnt from: oindrila-b
Repo: xynehq/xyne PR: 545
File: server/integrations/google/index.ts:137-145
Timestamp: 2025-06-16T11:56:22.752Z
Learning: In server/integrations/google/index.ts, both Logger (base logger) and loggerWithChild (factory for email-scoped child loggers) are intentionally maintained to handle different logging scenarios - one for when email context is not available and one for when it is available.

Applied to files:

  • server/execution-engine/communication-service.ts
📚 Learning: 2025-09-02T16:41:31.729Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowTools.ts:9-14
Timestamp: 2025-09-02T16:41:31.729Z
Learning: In the workflowTools.ts schema, the user prefers to add notNull() constraint to the type field but does not want onDelete cascade behavior for the workflowTemplateId foreign key reference.

Applied to files:

  • server/db/workflow.ts
  • server/db/schema/workflows.ts
📚 Learning: 2025-10-28T07:51:03.144Z
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: server/db/schema/workflows.ts:320-320
Timestamp: 2025-10-28T07:51:03.144Z
Learning: In the workflow template architecture, the API layer handles conversion between external UUIDs (used by frontend) and internal integer IDs (used in database). Frontend types expect UUID strings for workflow template IDs, while internal schemas like createWorkflowStepTemplateSchema expect integers. The publicWorkflowTemplateSchema maps external_id → id for API responses.

Applied to files:

  • server/db/workflow.ts
  • server/db/schema/workflows.ts
  • server/api/workflow-template.ts
  • server/server.ts
📚 Learning: 2025-09-04T09:29:29.879Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowServiceConfig.ts:51-53
Timestamp: 2025-09-04T09:29:29.879Z
Learning: In the workflowServiceConfig.ts schema, the user prefers to keep currentWorkflowExeId as a simple uuid reference without adding foreign key constraints or onDelete behaviors to the workflowExe table.

Applied to files:

  • server/db/schema/workflows.ts
  • server/execution-engine/workflow-executor.ts
📚 Learning: 2025-05-28T10:47:41.020Z
Learnt from: naSim087
Repo: xynehq/xyne PR: 484
File: server/integrations/google/sync.ts:222-222
Timestamp: 2025-05-28T10:47:41.020Z
Learning: The functions `handleGoogleDriveChange` and `getDriveChanges` in `server/integrations/google/sync.ts` are intentionally exported for future changes, even though they are not currently being imported by other modules.

Applied to files:

  • server/server.ts
📚 Learning: 2025-09-02T16:37:08.246Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowExecution.ts:29-29
Timestamp: 2025-09-02T16:37:08.246Z
Learning: In the Xyne workflow system schema, cascade delete behavior should not be added to the foreign key relationship between workflowStepExe.workflowExeId and workflowExe.id. Manual control over step deletion is preferred over automatic cascade behavior.

Applied to files:

  • server/execution-engine/workflow-executor.ts
🧬 Code graph analysis (15)
server/workflow-tools/delay.ts (1)
server/workflow-tools/types.ts (3)
  • WorkflowTool (23-30)
  • WorkflowContext (5-9)
  • ToolExecutionResult (12-18)
server/execution-engine/message-queue.ts (1)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/api/workflow-execution.ts (5)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/db/user.ts (1)
  • getUserFromJWT (488-500)
server/db/workflow.ts (1)
  • getWorkflowTemplateByIdWithPermissionCheck (68-109)
server/execution-engine/execution-client.ts (1)
  • executionClient (119-119)
server/db/schema/workflows.ts (6)
  • workflowExecution (153-178)
  • workflowTemplate (71-98)
  • workflowStepExecution (181-210)
  • workflowStepTemplate (101-128)
  • toolExecution (213-230)
  • workflowTool (131-150)
server/execution-engine/types.ts (1)
server/workflow-tools/types.ts (1)
  • ToolExecutionResult (12-18)
server/execution-engine/triggers.ts (7)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/communication-service.ts (1)
  • handleManualTrigger (135-157)
server/db/schema/workflows.ts (8)
  • workflowStepExecution (181-210)
  • WorkflowStatus (481-481)
  • toolExecution (213-230)
  • ToolExecutionStatus (485-485)
  • SelectWorkflowTemplate (256-258)
  • workflowStepTemplate (101-128)
  • ToolCategory (484-484)
  • ToolType (483-483)
server/execution-engine/types.ts (1)
  • ExecutionPacket (70-77)
server/execution-engine/execution-engine-queue.ts (1)
  • queueNextSteps (149-230)
server/workflow-tools/registry.ts (1)
  • getTool (39-45)
server/workflow-tools/types.ts (1)
  • WorkflowTool (23-30)
server/execution-engine/execution-client.ts (2)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/message-queue.ts (2)
  • ExecutionRequest (17-22)
  • messageQueue (289-289)
server/execution-engine/communication-service.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/message-queue.ts (4)
  • ExecutionMessage (10-15)
  • ExecutionResponse (24-29)
  • messageQueue (289-289)
  • ExecutionRequest (17-22)
server/execution-engine/workflow-executor.ts (1)
  • workflowExecutor (312-312)
server/execution-engine/triggers.ts (1)
  • handleManualTrigger (18-127)
server/workflow-tools/registry.ts (1)
server/workflow-tools/types.ts (2)
  • ToolRegistry (33-35)
  • defaultToolConfig (20-20)
server/workflow-tools/scheduler-trigger.ts (3)
server/workflow-tools/types.ts (4)
  • WorkflowTool (23-30)
  • defaultToolConfig (20-20)
  • WorkflowContext (5-9)
  • ToolExecutionResult (12-18)
server/db/schema/workflows.ts (4)
  • ToolType (483-483)
  • ToolCategory (484-484)
  • ToolExecutionStatus (485-485)
  • SelectWorkflowTemplate (256-258)
server/execution-engine/message-queue.ts (2)
  • ExecutionRequest (17-22)
  • messageQueue (289-289)
server/execution-engine/execution-engine-queue.ts (3)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/types.ts (2)
  • ExecutionPacket (70-77)
  • StepExecutionResult (80-88)
server/db/schema/workflows.ts (3)
  • workflowExecution (153-178)
  • WorkflowStatus (481-481)
  • workflowStepExecution (181-210)
server/db/workflow.ts (2)
server/db/schema/workflows.ts (3)
  • workflowTemplate (71-98)
  • selectWorkflowTemplateSchema (233-233)
  • workflowStepTemplate (101-128)
server/types.ts (1)
  • TxnOrClient (340-340)
server/execution-engine/step-executor.ts (5)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/execution-engine/types.ts (2)
  • ExecutionPacket (70-77)
  • StepExecutionResult (80-88)
server/db/schema/workflows.ts (6)
  • ToolExecutionStatus (485-485)
  • ToolType (483-483)
  • workflowStepExecution (181-210)
  • WorkflowStatus (481-481)
  • toolExecution (213-230)
  • SelectWorkflowStepExecution (266-268)
server/workflow-tools/types.ts (2)
  • WorkflowContext (5-9)
  • ToolExecutionResult (12-18)
server/workflow-tools/registry.ts (1)
  • getTool (39-45)
server/api/workflow-template.ts (8)
server/logger/index.ts (3)
  • getLoggerWithChild (192-200)
  • Subsystem (15-15)
  • getLogger (36-93)
server/workflow-tools/registry.ts (3)
  • getSupportedToolTypes (53-55)
  • getToolCategory (59-62)
  • getTool (39-45)
server/db/schema/workflows.ts (4)
  • ToolType (483-483)
  • workflowTemplate (71-98)
  • workflowStepTemplate (101-128)
  • SelectWorkflowTemplate (256-258)
server/db/user.ts (1)
  • getUserFromJWT (488-500)
server/db/schema/userWorkflowPermissions.ts (1)
  • insertUserWorkflowPermissionSchema (56-62)
server/db/userWorkflowPermissions.ts (1)
  • grantUserWorkflowPermission (41-57)
server/db/workflow.ts (1)
  • getWorkflowTemplateByIdWithPermissionCheck (68-109)
server/execution-engine/triggers.ts (1)
  • handleTemplateStateChange (130-206)
server/server.ts (4)
server/api/workflow-template.ts (8)
  • GetAvailableToolTypesApi (34-59)
  • GetToolTypeSchemaApi (62-88)
  • createTemplateSchema (104-109)
  • CreateTemplateApi (166-213)
  • GetTemplateApi (269-342)
  • UpdateTemplateApi (216-266)
  • DeleteTemplateApi (345-385)
  • HandleStateChangeTemplateApi (387-459)
server/api/workflow-template-step.ts (8)
  • addTemplateStepSchema (27-48)
  • AddTemplateStepApiHandler (90-280)
  • updateTemplateStepSchema (50-71)
  • UpdateTemplateStepApiHandler (283-501)
  • deleteTemplateStepSchema (73-75)
  • DeleteTemplateStepApiHandler (503-663)
  • deleteLinkSchema (77-87)
  • deleteLink (666-810)
server/api/workflow-execution.ts (5)
  • ExecuteTemplateHandler (25-58)
  • GetWorkflowExecutionDetailsApi (144-347)
  • StopExecutionApi (62-88)
  • GetEngineHealthApi (91-112)
  • HandleManualTrigger (115-141)
server/execution-engine/message-queue.ts (1)
  • messageQueue (289-289)
server/execution-engine/workflow-executor.ts (4)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/db/schema/workflows.ts (6)
  • workflowExecution (153-178)
  • WorkflowStatus (481-481)
  • workflowTemplate (71-98)
  • workflowStepTemplate (101-128)
  • workflowStepExecution (181-210)
  • ToolCategory (484-484)
server/execution-engine/types.ts (1)
  • ExecutionPacket (70-77)
server/execution-engine/execution-engine-queue.ts (1)
  • sendExecutionPacket (93-118)
🔇 Additional comments (2)
server/db/schema/workflows.ts (1)

215-217: Verify optional workflow tool ID is intentional.

The workflowToolId field on the toolExecution table has been made optional (lines 216-217 comment out the .notNull() and .references()). This means tool executions can now exist without a reference to a workflow tool. Ensure this is intentional and aligns with your execution model where tools might be defined inline on steps rather than as separate tool entities.

If this change supports inline tool definitions (via toolCategory, toolType, toolConfig on steps), please confirm that all execution paths handle the null case correctly.

server/server.ts (1)

2422-2424: LGTM! Clean message queue initialization.

The dynamic import and await pattern for message queue initialization is well-structured and ensures the queue is properly set up before the server starts processing requests.

Comment on lines +118 to 135
const [teamplateStep] = await trx
.select({
workflowTemplateId: workflowStepTemplate.workflowTemplateId,
})
.from(workflowStepTemplate)
.where(eq(workflowStepTemplate.id, templateStepId))
.limit(1)
const template = await getWorkflowTemplateByIdWithPermissionCheck(
trx,
teamplateStep.workflowTemplateId,
workspaceId,
userId
)
if (!template) {
return false
}
return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard against missing template step before dereferencing.

If no workflow step template exists for the supplied templateStepId, teamplateStep is undefined and teamplateStep.workflowTemplateId on Line 127 throws, turning an access check into a 500 instead of a clean “no access” result. Please bail out when the lookup returns nothing:

-  const [teamplateStep] = await trx
+  const [templateStep] = await trx
     .select({
       workflowTemplateId: workflowStepTemplate.workflowTemplateId,
     })
     .from(workflowStepTemplate)
     .where(eq(workflowStepTemplate.id, templateStepId))
     .limit(1)
-  const template = await getWorkflowTemplateByIdWithPermissionCheck(
-    trx,
-    teamplateStep.workflowTemplateId,
-    workspaceId,
-    userId
-  )
+  if (!templateStep) {
+    return false
+  }
+
+  const template = await getWorkflowTemplateByIdWithPermissionCheck(
+    trx,
+    templateStep.workflowTemplateId,
+    workspaceId,
+    userId
+  )

This preserves the intended boolean contract and prevents an avoidable runtime crash.

🤖 Prompt for AI Agents
In server/db/workflow.ts around lines 118 to 135, the code dereferences
teamplateStep.workflowTemplateId without checking whether teamplateStep is
defined; add a guard immediately after the DB select (e.g. if (!teamplateStep)
return false) so the function returns false when no template step is found
instead of throwing, then proceed to call
getWorkflowTemplateByIdWithPermissionCheck only when teamplateStep exists.

Comment on lines +160 to +189
// Use boss.schedule for cron expressions
await this.boss.schedule(this.INCOMING_QUEUE, cron, message, {
tz: 'UTC',
key : request.templateId
})
Logger.info(`Scheduled execution with cron '${cron}' and correlation ID: ${correlationId}`)
} else if (start_at) {
// Use boss.sendAfter for specific start time
await this.boss.sendAfter(this.INCOMING_QUEUE, message, {}, start_at)
Logger.info(`Scheduled execution for ${start_at} with correlation ID: ${correlationId}`)
} else {
// Use boss.send for immediate execution
await this.boss.send(this.INCOMING_QUEUE, message)
Logger.info(`Published immediate execution message with correlation ID: ${correlationId}`)
}

return correlationId
}

async schedule(request: ExecutionRequest, cron: string): Promise<string> {
return this.publishExecution(request, undefined, cron)
}

async unschedule(templateId: string): Promise<void> {
if (!this.checkInitialized()) {
throw new Error("Message queue not initialized")
}

await this.boss.unschedule(this.INCOMING_QUEUE, templateId)
Logger.info(`Unscheduled executions for template ID: ${templateId}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Unique cron schedules are overwritten and can’t be unscheduled per template.

Line 162 reuses the same queue name for every boss.schedule call, but per pg-boss scheduling docs (schedule(name, cron, ...) will update an existing schedule when the name matches), so activating a second template replaces the first instead of creating a distinct cron job. On Line 188 we then call boss.unschedule(this.INCOMING_QUEUE, templateId), yet unschedule(name) only looks at the schedule name; the extra templateId is ignored, meaning this deletes whichever singleton schedule exists for that queue and still can’t target a specific template. Net effect: only one cron template can exist at a time, and unscheduling the wrong template will silently drop the remaining one.

Please generate a per-template schedule identifier (e.g. ${this.INCOMING_QUEUE}:${request.templateId}) and use the same name for both schedule and unschedule. Also drop the unused key option so the intent is unambiguous. For example:

-      await this.boss.schedule(this.INCOMING_QUEUE, cron, message, {
-        tz: 'UTC',
-        key :  request.templateId
-      })
+      const scheduleName = `${this.INCOMING_QUEUE}:${request.templateId}`
+      await this.boss.schedule(scheduleName, cron, message, { tz: 'UTC' })
...
-    await this.boss.unschedule(this.INCOMING_QUEUE, templateId)
+    const scheduleName = `${this.INCOMING_QUEUE}:${templateId}`
+    await this.boss.unschedule(scheduleName)

Without this change, cron-triggered workflows will either clobber each other or disappear unexpectedly.


I'll verify the review comment by examining the actual code and checking the pg-boss library documentation.


🏁 Script executed:

#!/bin/bash
# Find and examine the message-queue.ts file
find . -name "message-queue.ts" -type f 2>/dev/null | head -20

Length of output: 42


🏁 Script executed:

#!/bin/bash
# Check the actual implementation around the lines mentioned
if [ -f "server/execution-engine/message-queue.ts" ]; then
  sed -n '150,200p' "server/execution-engine/message-queue.ts"
fi

Length of output: 1694


🏁 Script executed:

#!/bin/bash
# Search for pg-boss imports and version
rg -n "pg-boss|pgboss" --type=ts --type=json -B2 -A2

Length of output: 9389


🏁 Script executed:

#!/bin/bash
# Check package.json for pg-boss version
if [ -f "package.json" ]; then
  jq '.dependencies, .devDependencies' package.json | grep -A5 -B5 "pg-boss"
fi

Length of output: 0


🌐 Web query:

What is the pg-boss schedule() and unschedule() method behavior? Does schedule() overwrite if same name is used? Does unschedule() accept a templateId parameter?

💡 Result:

Short answer:

  • pg-boss schedule(name, cron, data, options) registers a named schedule; if a schedule with the same name already exists it is updated/replaced (i.e. it overwrites the existing schedule). [1][2]

  • unschedule(...) for pg-boss takes the schedule name (queue/name) to remove the schedule. It does not accept a templateId parameter. Use the name returned/used when scheduling. [1][2]

Sources: pg-boss API/docs and changelog. [1] (gist.github.com)

Citations:


🌐 Web query:

pg-boss schedule method documentation parameters

💡 Result:

Summary — pg-boss schedule(name, cron, data, options)

  • signature: schedule(name, cron, data, options). [1]
  • name (string) — required: the job/queue name to schedule. [1]
  • cron (string) — required: cron expression (normally 5-field minute precision; pg-boss checks schedules periodically). [1][3]
  • data (object) — optional: payload that will be passed to the created job. [1]
  • options (object) — optional: accepts the same options as send() (e.g., retention*, startAfter/startIn, singletonKey/singletonSeconds/… and other job/send-level controls) plus an optional tz property to set timezone (defaults to UTC). [1][4]

Notes

  • If a schedule already exists for the given name it is updated. [1]
  • Scheduling relies on at least one running instance; scheduling checks are debounced (minute-level behavior noted in changelog/issues). [1][3]

Sources
[1] pg-boss scheduling docs (schedule API).
[2] pg-boss GitHub repository (project README).
[3] pg-boss changelog / releases (cron behavior).
[4] pg-boss send/options reference (examples of send() options).


Unique cron schedules are overwritten and can't be unscheduled per template.

The pg-boss schedule() method overwrites existing schedules when the same name is used, and unschedule() accepts only the schedule name parameter and does not use templateId. Your code reuses this.INCOMING_QUEUE for all template schedules (line 162), causing subsequent scheduling calls to replace prior ones. When unscheduling on line 188, the templateId parameter is ignored by pg-boss, so unscheduling any template silently removes whichever singleton schedule exists regardless of which template is actually targeted.

Generate per-template schedule names using ${this.INCOMING_QUEUE}:${request.templateId} for both schedule() and unschedule() calls, and remove the unused key option:

-      await this.boss.schedule(this.INCOMING_QUEUE, cron, message, {
-        tz: 'UTC',
-        key :  request.templateId
-      })
+      const scheduleName = `${this.INCOMING_QUEUE}:${request.templateId}`
+      await this.boss.schedule(scheduleName, cron, message, { tz: 'UTC' })
...
-    await this.boss.unschedule(this.INCOMING_QUEUE, templateId)
+    const scheduleName = `${this.INCOMING_QUEUE}:${templateId}`
+    await this.boss.unschedule(scheduleName)

Without this fix, only one cron-triggered workflow template can exist at a time, and unscheduling will silently clobber all others.

🤖 Prompt for AI Agents
In server/execution-engine/message-queue.ts around lines 160 to 189, the code
uses a single schedule name this.INCOMING_QUEUE for all cron schedules and
passes a redundant key option, which causes pg-boss to overwrite schedules and
makes unschedule(templateId) ineffective; change the schedule and unschedule
calls to use a per-template schedule name like
`${this.INCOMING_QUEUE}:${request.templateId}` (construct the name before
calling boss.schedule and boss.unschedule), remove the unused key option from
the boss.schedule call, and in unschedule(templateId) build the same
per-template schedule name and pass it to boss.unschedule so each template has
its own cron schedule that can be individually removed.

})
.where(eq(workflowStepExecution.id, stepId))

Logger.info(`📊 Updated step ${stepId} status to ${status}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Undefined variable reference.

Line 84 logs status in the success message, but there is no status variable defined in the updateFailedStepStatus method scope. This will cause a runtime error. The method only ever sets the status to WorkflowStatus.FAILED.

Apply this diff:

-        Logger.info(`📊 Updated step ${stepId} status to ${status}`)
+        Logger.info(`📊 Updated step ${stepId} status to FAILED`)

Or define a local variable if status might vary in the future.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Logger.info(`📊 Updated step ${stepId} status to ${status}`)
Logger.info(`📊 Updated step ${stepId} status to FAILED`)
🤖 Prompt for AI Agents
In server/execution-engine/step-executor.ts around line 84,
updateFailedStepStatus logs a nonexistent variable `status`, causing a runtime
error; replace the reference with the actual status value being set
(WorkflowStatus.FAILED) or define a local const status = WorkflowStatus.FAILED
before the log and use that variable so the log references a defined value and
matches the state change.


Logger.info(`📊 Updated step ${stepId} status to ${status}`)
} catch (error) {
Logger.error(error, `Failed to update step ${stepId} status to ${status}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Another undefined variable reference in error log.

Line 86 also references the undefined status variable in the error log within the catch block.

Apply this diff:

-      Logger.error(error, `Failed to update step ${stepId} status to ${status}`)
+      Logger.error(error, `Failed to update step ${stepId} status to FAILED`)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Logger.error(error, `Failed to update step ${stepId} status to ${status}`)
Logger.error(error, `Failed to update step ${stepId} status to FAILED`)
🤖 Prompt for AI Agents
In server/execution-engine/step-executor.ts around line 86, the catch block uses
an undefined variable `status` in the Logger.error call; replace that reference
with the actual in-scope status variable (for example the local variable that
holds the intended status such as newStatus, stepStatus, statusToSet, or the
step.status property), or if no such variable exists, remove the status
interpolation and log only the stepId and error; update the Logger.error
invocation to reference the correct variable name so the error message compiles
and shows the intended status.

import type { ExecutionPacket } from "./types"
import { ToolCategory } from "@/db/schema/workflows"

const Logger = getLogger(Subsystem.WorkflowApi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent logger subsystem.

The logger uses Subsystem.WorkflowApi but this is part of the execution engine. For consistency with other execution engine modules (e.g., server/execution-engine/step-executor.ts line 10 uses Subsystem.ExecutionEngine), this should also use Subsystem.ExecutionEngine.

Apply this diff:

-const Logger = getLogger(Subsystem.WorkflowApi)
+const Logger = getLogger(Subsystem.ExecutionEngine)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const Logger = getLogger(Subsystem.WorkflowApi)
const Logger = getLogger(Subsystem.ExecutionEngine)
🤖 Prompt for AI Agents
In server/execution-engine/workflow-executor.ts around line 16, the logger is
created with Subsystem.WorkflowApi which is inconsistent with other
execution-engine modules; update the getLogger call to use
Subsystem.ExecutionEngine instead (e.g., replace Subsystem.WorkflowApi with
Subsystem.ExecutionEngine), ensure the Subsystem symbol is imported/available in
this file, and run linters/tests to confirm no import or naming issues.

success: true,
data: {
executionId,
status: "STOPPED",
Copy link
Contributor

@YashJuspay626 YashJuspay626 Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be stop_requested?Since we cannot stop instantly.

templateState: workflowTemplate.state,
})
.from(workflowExecution)
.leftJoin(workflowTemplate, eq(workflowExecution.workflowTemplateId, workflowTemplate.id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we avoid this left join. SInce the intermediate table will be very large?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments