Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThis PR introduces a complete workflow execution engine with message-queue-based orchestration, comprehensive tool implementations, and server APIs for template management, step lifecycle, and execution control. It also updates job queue expiration configurations from hours to seconds for consistency. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MainServer as Main Server
participant MessageQueue as Message Queue
participant ExecEngine as Execution Engine
participant Database as Database
Client->>MainServer: POST /workflow/template/execute
MainServer->>MainServer: Validate & authenticate
MainServer->>MessageQueue: publishExecution (START_EXECUTION)
MainServer-->>Client: Return executionId
ExecEngine->>MessageQueue: Poll for messages
MessageQueue-->>ExecEngine: START_EXECUTION
ExecEngine->>ExecEngine: handleStartExecution
ExecEngine->>Database: Create workflow_execution
ExecEngine->>Database: Create step_executions
ExecEngine->>ExecEngine: queueNextSteps (root steps)
ExecEngine->>MessageQueue: publishResponse
ExecEngine->>ExecEngine: Process root step packet
ExecEngine->>ExecEngine: StepExecutor.executeStep
ExecEngine->>ExecEngine: Execute tool (EmailTool, FormTool, etc.)
ExecEngine->>Database: Create tool_execution record
ExecEngine->>ExecEngine: Determine nextAction
alt nextAction = continue
ExecEngine->>ExecEngine: queueNextSteps (next steps)
else nextAction = wait_for_input
ExecEngine->>Database: Mark step WAITING
end
Client->>MainServer: POST /workflow/:id/manual-trigger/:stepId
MainServer->>ExecEngine: triggerManualStep
ExecEngine->>ExecEngine: queueNextSteps (pending step)
ExecEngine->>Database: Update execution status
ExecEngine-->>MainServer: Success
MainServer-->>Client: Trigger acknowledged
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120+ minutes Areas requiring extra attention:
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @debajyoti0606, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust and scalable workflow execution engine, fundamentally changing how automated processes are handled. It establishes a new communication layer using PgBoss for asynchronous job processing, defines a flexible structure for workflow templates and steps, and provides a rich set of API endpoints for full lifecycle management of workflows. The changes lay the groundwork for highly customizable and extensible automation capabilities within the application. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new executor engine, which is a significant addition to the codebase. The changes are extensive, including new API endpoints, database schema modifications, and a new message queue-based architecture for background processing. The overall structure is well-designed, but there are several areas for improvement regarding error handling, efficiency, and code correctness. I've identified a critical bug in the template validation logic and other high-severity issues related to input validation and fragile implementation details. Additionally, there are opportunities to improve code readability and maintainability.
server/api/workflow-template.ts
Outdated
There was a problem hiding this comment.
There appears to be a bug in this condition. target is a UUID, but it's being compared to node.name, which is a string. This comparison will always be false. It should likely be target === node.uuid. Similarly, workflowNodeNames is a Set of string names, but source is a UUID, so workflowNodeNames.has(source) will also always be false. You should create a set of UUIDs for this check.
const workflowNodeUUIDs = new Set(workflow.map(n => n.uuid))
return !connections.some((conn: string[]) => {
const [[source,], [target,]] = extractUUIDFromArray(conn)
return target === node.uuid && workflowNodeUUIDs.has(source)
})| const requestData = await c.req.json() | ||
| const { templateId } = requestData | ||
|
|
||
| if (!templateId) { | ||
| throw new HTTPException(400, { message: "templateId is required in request body" }) | ||
| } |
There was a problem hiding this comment.
The request body is parsed from JSON without any validation. This can lead to runtime errors if the request body is malformed or missing required fields. It's a good practice to use a validation library like zod to define a schema for the request body and validate it. This pattern is already used in other new files like server/api/workflow-template-step.ts. This would improve type safety and provide clear error messages for invalid requests. This feedback also applies to other handlers in this file that process request bodies or parameters without validation.
| const connections: string[] = [] | ||
| steps.forEach(step => { | ||
| if (step.nextStepIds && step.nextStepIds.length > 0) { | ||
| step.nextStepIds.forEach(nextStepId => { | ||
| const nextStep = steps.find(s => s.id === nextStepId) | ||
| if (nextStep) { | ||
| connections.push(`${step.name}-${nextStep.name}`) | ||
| } | ||
| }) | ||
| } | ||
| }) |
There was a problem hiding this comment.
Reconstructing connections using step names (e.g., ${step.name}-${nextStep.name}) is fragile because step names are not guaranteed to be unique within a workflow template. If two steps have the same name, this logic will lead to incorrect connections being created. It would be more robust to use the unique step IDs for this purpose.
| { | ||
| retryLimit: 3, | ||
| expireInHours: 12, | ||
| expireInSeconds: 43200, |
There was a problem hiding this comment.
For better readability and maintainability, it's recommended to avoid using magic numbers. The value 43200 can be expressed as a calculation, which makes the intent clearer. This is consistent with the approach taken in server/api/oauth.ts (JobExpiryHours * 3600). This comment also applies to other occurrences of 43200 in this file.
expireInSeconds: 12 * 60 * 60,| ) | ||
|
|
||
| if (!template) { | ||
| throw new Error("Template not found or access denied") |
There was a problem hiding this comment.
Throwing a generic Error within a database transaction will result in a generic 500 Internal Server Error response to the client. It's better to use HTTPException with a specific status code (e.g., 404 for 'not found', 400 for 'bad request') to provide more meaningful feedback to the API consumer. This allows the client to handle different error scenarios appropriately. This applies to other throw new Error(...) statements within this file's API handlers.
| throw new Error("Template not found or access denied") | |
| throw new HTTPException(404, { message: "Template not found or access denied" }) |
server/api/workflow-template.ts
Outdated
There was a problem hiding this comment.
This query fetches all tools for the current workspace and then filters them in memory. This can be inefficient if the workspace contains a large number of tools. Since you have the IDs of the newly created tools in toolIdMap, you can fetch only the relevant tools directly from the database using inArray.
| const createdTools = await db | |
| .select() | |
| .from(workflowTool) | |
| .where(eq(workflowTool.workspaceId, user.workspaceId)) | |
| const createdTools = toolIdMap.size > 0 ? await db | |
| .select() | |
| .from(workflowTool) | |
| .where(inArray(workflowTool.id, Array.from(toolIdMap.values()))) | |
| : [] |
| const mockContext = { | ||
| req: { | ||
| param: () => ({ workflowId: request.workflowId, stepId: request.stepId }) | ||
| }, | ||
| json: (data: any) => data | ||
| } as any |
There was a problem hiding this comment.
Creating a mock Context object to call handleManualTrigger couples this service to the implementation details of an API handler. This is considered a code smell. A better approach would be to extract the core logic from handleManualTrigger into a separate function that can be called directly by both this service and the Hono API handler. This would improve modularity and make the code easier to test and maintain.
| async pollForResponse(correlationId: string, timeoutMs: number = 30000): Promise<ExecutionResponse> { | ||
| if (!this.checkInitialized()) { | ||
| throw new Error("Message queue not initialized") | ||
| } | ||
|
|
||
| const startTime = Date.now() | ||
| const pollInterval = 500 // Poll every 500ms | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| const poll = async () => { | ||
| try { | ||
| // Get the internal database connection from PgBoss | ||
| const db = this.getDatabase() | ||
|
|
||
| // Direct SQL query to find job by correlation ID | ||
| // Using JSONB operator ->> for efficient correlation ID filtering | ||
| const findJobSql = ` | ||
| SELECT id, data, state, created_on | ||
| FROM pgboss.job | ||
| WHERE name = $1 | ||
| AND state = 'created' | ||
| AND data->>'correlationId' = $2 | ||
| ORDER BY created_on ASC | ||
| LIMIT 1 | ||
| ` | ||
|
|
||
| const result = await db.executeSql(findJobSql, [this.OUTGOING_QUEUE, correlationId]) | ||
|
|
||
| if (result?.rows?.length > 0) { | ||
| const job = result.rows[0] | ||
| const jobData = job.data as ExecutionResponse | ||
|
|
||
| // Extract response data | ||
| const response: ExecutionResponse = { | ||
| success: jobData.success, | ||
| executionId: jobData.executionId, | ||
| error: jobData.error, | ||
| data: jobData.data | ||
| } | ||
|
|
||
| // Mark job as completed using direct SQL | ||
| const completeJobSql = ` | ||
| UPDATE pgboss.job | ||
| SET state = 'completed', | ||
| completed_on = now() | ||
| WHERE id = $1 | ||
| ` | ||
|
|
||
| await db.executeSql(completeJobSql, [job.id]) | ||
|
|
||
| Logger.info(`Response found and processed for correlation ID: ${correlationId}`) | ||
| resolve(response) | ||
| return | ||
| } | ||
|
|
||
| // Check timeout | ||
| if (Date.now() - startTime >= timeoutMs) { | ||
| reject(new Error(`Response timeout for correlation ID: ${correlationId}`)) | ||
| return | ||
| } | ||
|
|
||
| // Continue polling | ||
| setTimeout(poll, pollInterval) | ||
|
|
||
| } catch (error) { | ||
| Logger.error(error, `Error polling for response ${correlationId}`) | ||
| reject(error) | ||
| } | ||
| } | ||
|
|
||
| poll() | ||
| }) | ||
| } |
There was a problem hiding this comment.
The pollForResponse function relies on direct SQL queries against the internal schema of pg-boss. This is a fragile approach, as the pgboss.job table structure could change in future versions of the library, which would break this code. While this might be a deliberate choice for performance, it's worth noting the risk. A safer, though potentially less performant, approach would be to use the public API of pg-boss if it provides a way to fetch jobs by custom attributes.
There was a problem hiding this comment.
Actionable comments posted: 27
🧹 Nitpick comments (12)
server/workflow-tools/form.ts (1)
79-92: Preserve field configuration in error case.When an error occurs, the error response returns an empty
fieldsarray, losing the form configuration. This makes it difficult for the client to understand what form was expected.Apply this diff:
} catch (error) { return { status: ToolExecutionStatus.FAILED, output: { form_id: "", form_title: config.title || "Form Input Required", form_description: config.description || "Please fill out the form", - fields: [], + fields: config.fields || [], submitted: false, submission_data: {}, error: error instanceof Error ? error.message : String(error), }, } }server/workflow-tools/webhook.ts (1)
46-68: Implement webhook execution logic and add schema validation.The webhook tool currently returns mock data and lacks input/config validation. The TODO at line 52 indicates this is incomplete.
Two concerns:
- Missing implementation: The tool returns mock success data without making actual HTTP requests.
- No validation: Like the FormTool, the defined
inputSchemaandconfigSchemaare not used to validate parameters.Apply this diff to add validation:
async execute( input: Record<string, any>, config: Record<string, any>, workflowContext: WorkflowContext ): Promise<ToolExecutionResult> { try { + // Validate input and config + const validatedInput = this.inputSchema.parse(input) + const validatedConfig = this.configSchema.parse(config) + // TODO: Implement webhook execution logic return {Would you like me to generate a complete implementation of the webhook HTTP request logic, including timeout handling, retry logic, and proper error responses?
server/execution-engine/communication-service.ts (1)
123-136: Reminder: Implement stop execution logic.The TODO at line 125 indicates that stop execution functionality is not yet implemented. This is a critical feature for workflow management.
Do you want me to generate an implementation for stopping workflow executions, or should this be tracked in a separate issue?
server/workflow-tools/agent.ts (1)
35-47: Stub implementation returns mock data.The execute method returns a hardcoded response without performing any actual agent interaction. The configured
agentId,prompt, andtimeoutare ignored.This appears to be a placeholder for future implementation. Would you like me to help design the actual agent integration logic, or should this be tracked in a separate issue?
server/execution-engine/index.ts (2)
67-69: Remove commented-out code.These commented export statements should either be uncommented (if needed) or removed entirely to keep the codebase clean.
Apply this diff to remove the commented lines:
-// Named and default export -// export { ExecutionEngine } -// export default ExecutionEngine
52-58: Consider returning comprehensive engine status.The
getEngineStatusmethod only returns service status but not queue, executor, or client status. This may provide an incomplete picture of the engine's health.Consider expanding the status object to include other subsystem states:
static async getEngineStatus() { const { communicationService } = await import("./communication-service") const { messageQueue } = await import("./message-queue") const { executionClient } = await import("./execution-client") return { service: communicationService.getServiceStatus(), queue: messageQueue.getQueueStatus?.() || { status: 'unknown' }, client: { connected: await executionClient.healthCheck() } } }server/workflow-tools/http-request.ts (1)
31-37: Consider adding URL validation in the input schema.While the URL will need runtime SSRF checks, adding basic format validation in the schema provides an early safety layer.
inputSchema = z.object({ - url: z.string(), + url: z.string().url("Must be a valid URL"), method: z.enum(["GET", "POST", "PUT", "DELETE", "PATCH"]).default("GET"), headers: z.record(z.string(), z.string()).optional(), body: z.any().optional(), params: z.record(z.string(), z.string()).optional(), })server/workflow-tools/switch.ts (1)
77-101: Consider using strict equality operators.Lines 81 and 83 use loose equality (
==and!=) which can lead to unexpected type coercion. For example,"5" == 5evaluates to true, which may not be the intended behavior in workflow conditions.If loose equality is intentional for flexible comparisons, document this behavior. Otherwise, use strict equality:
switch (operator) { case "==": - return val1 == val2 + return val1 === val2 case "!=": - return val1 != val2 + return val1 !== val2server/workflow-tools/jira.ts (1)
61-85: Stub implementation - tracking implementation status.The TODO on line 67 indicates this is a placeholder. When implementing the actual Jira integration, ensure you handle authentication errors, rate limiting, and API response validation.
Would you like me to help design the Jira API integration logic or create a separate issue to track this implementation?
server/workflow-tools/slack.ts (1)
57-69: Stub implementation - no actual Slack integration.The execute method returns a static response without making any Slack API calls. The configured channel, username, and emoji are ignored.
Would you like me to help design the Slack API integration using the Slack SDK or webhook approach?
server/workflow-tools/merged-node.ts (1)
41-53: Missing merge logic implementation.The execute method ignores the configured
mergeStrategy,waitForAll, andtimeoutoptions. The tool should implement actual merge logic based on these settings (e.g., merging objects, concatenating arrays, or selecting the latest value).Would you like me to help design the merge logic for different strategies, or should this be tracked as a follow-up task?
server/server.ts (1)
1568-1569: Consider extracting inline arrow functions for better maintainability.The inline arrow functions wrapping
HandleStateChangeTemplateApiare functional but could be more maintainable if extracted as named handlers or created via a factory function.Example refactor:
// Extract as named handlers const handleActivateTemplate = (c: Context) => HandleStateChangeTemplateApi(c, TemplateState.ACTIVE) const handleDeactivateTemplate = (c: Context) => HandleStateChangeTemplateApi(c, TemplateState.INACTIVE) // Then use in routes: .post("/workflow/template/activate", handleActivateTemplate) .post("/workflow/template/deactivate", handleDeactivateTemplate)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (40)
server/api/knowledgeBase.ts(4 hunks)server/api/oauth.ts(1 hunks)server/api/workflow-execution.ts(1 hunks)server/api/workflow-template-step.ts(1 hunks)server/api/workflow-template.ts(1 hunks)server/config.ts(2 hunks)server/db/schema/workflows.ts(8 hunks)server/execution-engine/communication-service.ts(1 hunks)server/execution-engine/execution-client.ts(1 hunks)server/execution-engine/execution-engine-queue.ts(1 hunks)server/execution-engine/index.ts(1 hunks)server/execution-engine/message-queue.ts(1 hunks)server/execution-engine/server.ts(1 hunks)server/execution-engine/step-executor.ts(1 hunks)server/execution-engine/triggers.ts(1 hunks)server/execution-engine/types.ts(1 hunks)server/execution-engine/workflow-executor.ts(1 hunks)server/package.json(2 hunks)server/queue/api-server-queue.ts(1 hunks)server/queue/boss.ts(1 hunks)server/queue/index.ts(9 hunks)server/server.ts(4 hunks)server/types.ts(1 hunks)server/types/workflowTypes.ts(3 hunks)server/workflow-tools/agent.ts(1 hunks)server/workflow-tools/ai-agent.ts(1 hunks)server/workflow-tools/delay.ts(1 hunks)server/workflow-tools/email.ts(1 hunks)server/workflow-tools/form.ts(1 hunks)server/workflow-tools/gmail.ts(1 hunks)server/workflow-tools/http-request.ts(1 hunks)server/workflow-tools/jira.ts(1 hunks)server/workflow-tools/manual-trigger.ts(1 hunks)server/workflow-tools/merged-node.ts(1 hunks)server/workflow-tools/registry.ts(1 hunks)server/workflow-tools/scheduler-trigger.ts(1 hunks)server/workflow-tools/slack.ts(1 hunks)server/workflow-tools/switch.ts(1 hunks)server/workflow-tools/types.ts(1 hunks)server/workflow-tools/webhook.ts(1 hunks)
🧰 Additional context used
🧠 Learnings (10)
📓 Common learnings
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.
📚 Learning: 2025-09-02T16:41:31.729Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowTools.ts:9-14
Timestamp: 2025-09-02T16:41:31.729Z
Learning: In the workflowTools.ts schema, the user prefers to add notNull() constraint to the type field but does not want onDelete cascade behavior for the workflowTemplateId foreign key reference.
Applied to files:
server/workflow-tools/types.tsserver/db/schema/workflows.ts
📚 Learning: 2025-09-12T13:28:43.782Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.
Applied to files:
server/api/workflow-execution.tsserver/server.ts
📚 Learning: 2025-10-27T12:58:52.760Z
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: frontend/src/components/workflow/WorkflowCard.tsx:47-48
Timestamp: 2025-10-27T12:58:52.760Z
Learning: In the workflow sharing feature, the UserWorkflowRole enum defined in server/shared/types.ts has three valid values: Owner = "owner", Shared = "shared", and Viewer = "viewer". Checking workflow.role === "shared" is valid and correct for detecting workflows that have been shared with a user.
Applied to files:
server/types/workflowTypes.ts
📚 Learning: 2025-10-28T07:51:03.144Z
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: server/db/schema/workflows.ts:320-320
Timestamp: 2025-10-28T07:51:03.144Z
Learning: In the workflow template architecture, the API layer handles conversion between external UUIDs (used by frontend) and internal integer IDs (used in database). Frontend types expect UUID strings for workflow template IDs, while internal schemas like createWorkflowStepTemplateSchema expect integers. The publicWorkflowTemplateSchema maps external_id → id for API responses.
Applied to files:
server/db/schema/workflows.tsserver/api/workflow-template-step.tsserver/api/workflow-template.tsserver/server.ts
📚 Learning: 2025-09-04T09:29:29.879Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowServiceConfig.ts:51-53
Timestamp: 2025-09-04T09:29:29.879Z
Learning: In the workflowServiceConfig.ts schema, the user prefers to keep currentWorkflowExeId as a simple uuid reference without adding foreign key constraints or onDelete behaviors to the workflowExe table.
Applied to files:
server/db/schema/workflows.tsserver/execution-engine/workflow-executor.ts
📚 Learning: 2025-09-16T08:57:58.762Z
Learnt from: rahul1841
Repo: xynehq/xyne PR: 843
File: server/server.ts:996-996
Timestamp: 2025-09-16T08:57:58.762Z
Learning: The DownloadFileApi handler in server/api/knowledgeBase.ts properly uses the clId route parameter to scope and validate file access within the correct collection.
Applied to files:
server/api/knowledgeBase.ts
📚 Learning: 2025-05-28T10:55:46.701Z
Learnt from: naSim087
Repo: xynehq/xyne PR: 484
File: server/integrations/google/gmail-worker.ts:293-294
Timestamp: 2025-05-28T10:55:46.701Z
Learning: There are two separate `parseMail` functions in the codebase: one in `server/integrations/google/gmail-worker.ts` with signature `(email, gmail, client, userEmail)` returning `{ mailData, insertedPdfCount, exist }`, and another in `server/integrations/google/gmail/index.ts` with signature `(email, gmail, userEmail, client, tracker?)` returning `{ mailData, exist }`. Each file calls its own local version correctly.
Applied to files:
server/workflow-tools/gmail.ts
📚 Learning: 2025-06-17T08:55:40.153Z
Learnt from: oindrila-b
Repo: xynehq/xyne PR: 557
File: server/integrations/google/gmail-worker.ts:313-317
Timestamp: 2025-06-17T08:55:40.153Z
Learning: In server/integrations/google/gmail-worker.ts, the log message intentionally shows totalMails (cumulative count) while sendStatsUpdate uses insertedMessagesInBatch (batch count). This is by design - logs show total progress for human readability while stats updates track batch progress for system functionality.
Applied to files:
server/workflow-tools/gmail.ts
📚 Learning: 2025-07-10T08:02:41.059Z
Learnt from: naSim087
Repo: xynehq/xyne PR: 651
File: server/tests/extractEmailHeadersVal.test.ts:49-58
Timestamp: 2025-07-10T08:02:41.059Z
Learning: The test file `server/tests/extractEmailHeadersVal.test.ts` is primarily designed to verify email extraction from headers like to, from, cc, and bcc. Message-ID related functionality refactoring is planned for a future PR.
Applied to files:
server/workflow-tools/email.ts
🧬 Code graph analysis (32)
server/workflow-tools/types.ts (2)
server/db/schema/workflows.ts (4)
ToolExecutionStatus(480-480)ToolType(478-478)ToolCategory(479-479)SelectWorkflowTemplate(251-253)server/api/workflow-template.ts (1)
workflowToolType(282-282)
server/execution-engine/triggers.ts (7)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/communication-service.ts (1)
handleManualTrigger(156-178)server/db/schema/workflows.ts (8)
workflowStepExecution(178-204)WorkflowStatus(476-476)toolExecution(207-225)ToolExecutionStatus(480-480)SelectWorkflowTemplate(251-253)workflowStepTemplate(101-124)workflowTool(127-147)ToolType(478-478)server/execution-engine/types.ts (1)
ExecutionPacket(70-78)server/execution-engine/execution-engine-queue.ts (1)
queueNextSteps(150-298)server/workflow-tools/registry.ts (1)
getTool(39-45)server/workflow-tools/types.ts (1)
WorkflowTool(23-30)
server/execution-engine/types.ts (1)
server/workflow-tools/types.ts (1)
ToolExecutionResult(13-18)
server/workflow-tools/jira.ts (2)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)server/db/schema/workflows.ts (3)
ToolType(478-478)ToolCategory(479-479)ToolExecutionStatus(480-480)
server/workflow-tools/ai-agent.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/execution-engine/communication-service.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/message-queue.ts (4)
ExecutionMessage(10-15)ExecutionResponse(24-29)messageQueue(276-276)ExecutionRequest(17-22)server/execution-engine/workflow-executor.ts (1)
workflowExecutor(359-359)server/execution-engine/triggers.ts (1)
handleManualTrigger(18-128)
server/execution-engine/message-queue.ts (2)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/index.ts (1)
messageQueue(6-6)
server/api/workflow-execution.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/db/user.ts (1)
getUserFromJWT(488-500)server/execution-engine/execution-client.ts (1)
executionClient(137-137)server/db/schema/workflows.ts (6)
workflowExecution(150-175)workflowTemplate(71-98)workflowStepExecution(178-204)workflowStepTemplate(101-124)toolExecution(207-225)workflowTool(127-147)
server/types/workflowTypes.ts (1)
server/db/schema/workflows.ts (2)
ToolCategory(479-479)ToolExecutionStatus(480-480)
server/execution-engine/execution-engine-queue.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/types.ts (2)
ExecutionPacket(70-78)StepExecutionResult(81-89)server/execution-engine/step-executor.ts (1)
stepExecutor(555-555)server/db/schema/workflows.ts (4)
workflowExecution(150-175)WorkflowStatus(476-476)workflowStepExecution(178-204)workflowStepTemplate(101-124)
server/workflow-tools/agent.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/workflow-tools/http-request.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/workflow-tools/switch.ts (2)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/workflow-tools/types.ts (4)
WorkflowTool(23-30)defaultToolConfig(20-20)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/execution-engine/index.ts (3)
server/execution-engine/execution-client.ts (1)
executionClient(137-137)server/execution-engine/workflow-executor.ts (1)
workflowExecutor(359-359)server/execution-engine/communication-service.ts (1)
communicationService(217-217)
server/workflow-tools/delay.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/execution-engine/execution-client.ts (2)
server/execution-engine/message-queue.ts (2)
ExecutionRequest(17-22)messageQueue(276-276)server/queue/boss.ts (1)
boss(4-7)
server/workflow-tools/slack.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/workflow-tools/registry.ts (2)
server/workflow-tools/types.ts (1)
ToolRegistry(33-35)server/workflow-tools/email.ts (1)
EmailTool(5-111)
server/workflow-tools/webhook.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/workflow-tools/merged-node.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/workflow-tools/manual-trigger.ts (1)
server/workflow-tools/types.ts (4)
WorkflowTool(23-30)defaultToolConfig(20-20)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/api/workflow-template-step.ts (5)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/db/schema/workflows.ts (4)
ToolType(478-478)workflowTemplate(71-98)workflowTool(127-147)workflowStepTemplate(101-124)server/api/workflow-template.ts (1)
workflowToolConfigSchema(184-194)server/db/user.ts (1)
getUserFromJWT(488-500)server/workflow-tools/registry.ts (1)
getToolCategory(59-62)
server/execution-engine/step-executor.ts (5)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/types.ts (2)
ExecutionPacket(70-78)StepExecutionResult(81-89)server/workflow-tools/registry.ts (1)
getTool(39-45)server/db/schema/workflows.ts (5)
ToolExecutionStatus(480-480)WorkflowStatus(476-476)workflowTool(127-147)workflowStepExecution(178-204)toolExecution(207-225)server/workflow-tools/types.ts (2)
WorkflowContext(5-10)ToolExecutionResult(13-18)
server/workflow-tools/scheduler-trigger.ts (3)
server/workflow-tools/types.ts (4)
WorkflowTool(23-30)defaultToolConfig(20-20)WorkflowContext(5-10)ToolExecutionResult(13-18)server/db/schema/workflows.ts (4)
ToolType(478-478)ToolCategory(479-479)ToolExecutionStatus(480-480)SelectWorkflowTemplate(251-253)server/execution-engine/message-queue.ts (2)
ExecutionRequest(17-22)messageQueue(276-276)
server/api/workflow-template.ts (5)
server/logger/index.ts (3)
getLoggerWithChild(192-200)Subsystem(15-15)getLogger(36-93)server/workflow-tools/registry.ts (3)
getToolCategory(59-62)getSupportedToolTypes(53-55)getTool(39-45)server/db/schema/workflows.ts (5)
ToolType(478-478)workflowTemplate(71-98)workflowTool(127-147)workflowStepTemplate(101-124)SelectWorkflowTemplate(251-253)server/db/user.ts (1)
getUserFromJWT(488-500)server/execution-engine/triggers.ts (1)
handleTemplateStateChange(131-226)
server/execution-engine/server.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/message-queue.ts (1)
messageQueue(276-276)server/execution-engine/execution-engine-queue.ts (1)
initExecutionEngineQueue(29-39)server/execution-engine/communication-service.ts (1)
communicationService(217-217)
server/workflow-tools/gmail.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/server.ts (4)
server/api/workflow-template.ts (8)
GetAvailableToolTypesApi(126-153)GetToolTypeSchemaApi(156-182)createTemplateSchema(212-219)CreateTemplateApi(285-491)GetTemplateApi(566-667)UpdateTemplateApi(494-563)DeleteTemplateApi(670-730)HandleStateChangeTemplateApi(866-929)server/api/workflow-template-step.ts (8)
addTemplateStepSchema(26-47)AddTemplateStepApiHandler(89-259)updateTemplateStepSchema(49-70)UpdateTemplateStepApiHandler(262-485)deleteTemplateStepSchema(72-74)DeleteTemplateStepApiHandler(487-640)deleteLinkSchema(76-86)deleteLink(643-782)server/api/workflow-execution.ts (6)
ExecuteTemplateHandler(24-53)GetExecutionStatusApi(56-79)GetWorkflowExecutionDetailsApi(164-359)StopExecutionApi(82-108)GetEngineHealthApi(111-132)HandleManualTrigger(135-161)server/execution-engine/message-queue.ts (1)
messageQueue(276-276)
server/workflow-tools/email.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/execution-engine/workflow-executor.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/db/schema/workflows.ts (7)
workflowExecution(150-175)WorkflowStatus(476-476)workflowTemplate(71-98)workflowStepTemplate(101-124)workflowStepExecution(178-204)workflowTool(127-147)ToolCategory(479-479)server/execution-engine/types.ts (1)
ExecutionPacket(70-78)server/execution-engine/execution-engine-queue.ts (1)
sendExecutionPacket(91-116)
server/workflow-tools/form.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-10)ToolExecutionResult(13-18)
server/queue/index.ts (2)
server/queue/api-server-queue.ts (1)
boss(9-12)server/queue/boss.ts (1)
boss(4-7)
🔇 Additional comments (10)
server/config.ts (1)
10-10: LGTM!The execution engine configuration follows the existing pattern for other services (sync server) and provides appropriate defaults. The centralized configuration approach enables proper separation of concerns for the new execution engine subsystem.
Also applies to: 15-15, 280-281
server/workflow-tools/delay.ts (1)
21-101: LGTM on overall implementation.The DelayTool has well-defined schemas and proper error handling. The unit conversion logic is clear and the implementation correctly handles input overrides.
server/workflow-tools/switch.ts (1)
10-193: Well-implemented conditional routing logic.The SwitchTool demonstrates good practices with comprehensive logging, proper error handling, multiple evaluation modes, and clear helper methods for value extraction and comparison.
server/workflow-tools/jira.ts (1)
23-27: Ensure API tokens are not logged or exposed.The
apiTokenfield contains sensitive credentials. When this tool is implemented, verify that:
- API tokens are never logged (even in error messages)
- Tokens are stored securely (encrypted at rest)
- Tokens are transmitted only over HTTPS
- Consider using OAuth instead of API tokens if possible
server/workflow-tools/registry.ts (1)
20-36: Registry wiring looks solid.
The consolidated registry neatly instantiates the tool catalog, and the helper exports keep the API consistent.server/execution-engine/types.ts (1)
69-89: Type definitions line up with the queue contract.
ExecutionPacketandStepExecutionResultcapture the runtime shape used across the engine; no issues spotted.server/server.ts (4)
337-358: LGTM: Workflow API imports are properly structured.The imports for workflow template, template step, and execution APIs are well-organized and all entities are used in the route definitions below.
1583-1583: LGTM: Manual trigger endpoint is properly secured.The manual trigger route is correctly placed after
AuthMiddlewareand follows the expected pattern for triggering workflow steps manually.Based on learnings.
2426-2429: LGTM: Message queue initialization is properly implemented.The message queue is initialized with:
- Dynamic import for lazy loading
- Proper awaiting of the initialization call
- Clear commenting explaining the inter-service communication purpose
- Appropriate error handling via the
init().catch()block at line 2463
1537-1573: Route pattern inconsistency confirmed between old and new workflow endpoints.The codebase has conflicting URL patterns for workflow resources:
- Templates:
/workflow/templates/:templateId(lines 1485-1487, 1493, plural) vs./workflow/template/:templateId(lines 1544-1546, singular)- Executions:
/workflow/executions/:executionId(line 1513, plural) vs./workflow/execution/:executionId/*(lines 1570-1572, singular)- API handler mismatch: Internal code at
server/api/workflow.ts:792generates URLs using plural form (/api/v1/workflow/executions/${execution.id}/status), which conflicts with the new singular endpoint patternThis duplication creates API confusion, maintenance complexity, and potential routing conflicts. Standardize all workflow endpoints to consistently use either plural or singular resource names.
⛔ Skipped due to learnings
Learnt from: devesh-juspay Repo: xynehq/xyne PR: 821 File: server/server.ts:771-837 Timestamp: 2025-09-12T13:28:43.782Z Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.Learnt from: Sithaarth24 Repo: xynehq/xyne PR: 1127 File: server/db/schema/workflows.ts:320-320 Timestamp: 2025-10-28T07:51:03.144Z Learning: In the workflow template architecture, the API layer handles conversion between external UUIDs (used by frontend) and internal integer IDs (used in database). Frontend types expect UUID strings for workflow template IDs, while internal schemas like createWorkflowStepTemplateSchema expect integers. The publicWorkflowTemplateSchema maps external_id → id for API responses.
| export const HandleManualTrigger = async (c: Context) => { | ||
| const { workflowId, stepId } = c.req.param() | ||
|
|
||
| try { | ||
| Logger.info(`🔴 Manual trigger requested for workflow ${workflowId}, step ${stepId}`) | ||
|
|
||
| // Use execution client to trigger the step (same pattern as ExecuteTemplateHandler) | ||
| const result = await executionClient.triggerManualStep(workflowId, stepId, "manual") | ||
|
|
||
| Logger.info(`✅ Manual trigger processed for step ${stepId}`) | ||
|
|
||
| return c.json({ | ||
| success: true, | ||
| message: "Manual trigger completed successfully", | ||
| stepId: stepId, | ||
| workflowId: workflowId, | ||
| result: result | ||
| }) |
There was a problem hiding this comment.
Enforce authentication before manual triggers
This handler never validates the caller’s JWT, so anyone who can hit the route can manually fire workflow steps—an authorization bypass that contradicts our requirement that all workflow endpoints be protected. Please authenticate first (even if the return value isn’t used) so unauthorized requests fail immediately, e.g.:
export const HandleManualTrigger = async (c: Context) => {
const { workflowId, stepId } = c.req.param()
try {
+ await getUserFromJWT(db, c.get(JwtPayloadKey))
Logger.info(`🔴 Manual trigger requested for workflow ${workflowId}, step ${stepId}`)Based on learnings.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In server/api/workflow-execution.ts around lines 135 to 152, the handler
triggers manual workflow steps without authenticating the caller; add an
explicit authentication check at the top of the handler (await the
JWT/authorization validation for the incoming request and short-circuit with a
401/403 response on failure) so unauthorized callers cannot proceed, ensure the
auth call is awaited before calling executionClient.triggerManualStep, and keep
the existing logging and success response only after authentication succeeds.
server/api/workflow-template.ts
Outdated
There was a problem hiding this comment.
Fix root-node detection in template validation
validateTemplateLogic builds workflowNodeNames from node.name but compares them to connection UUIDs (source/target). Because the identifiers never match, every node is treated as “root”, so the trigger-at-start rule is silently skipped. Invalid templates (no trigger on actual entry points) now pass and will fail later at execution time. Please base the root detection on the node UUIDs returned by extractUUIDFromArray instead of the human-readable names.
Update the logic to track UUIDs throughout the DFS and root checks so only genuine start nodes are accepted.
| import { workflowExecutor } from "./workflow-executor" | ||
| import { handleManualTrigger } from "./triggers" | ||
|
|
||
| const Logger = getLogger(Subsystem.WorkflowApi) |
There was a problem hiding this comment.
Incorrect logger subsystem.
The logger uses Subsystem.WorkflowApi but this is the execution engine communication service. Based on line 9 in server/execution-engine/server.ts, the correct subsystem should be Subsystem.ExecutionEngine.
Apply this diff:
-const Logger = getLogger(Subsystem.WorkflowApi)
+const Logger = getLogger(Subsystem.ExecutionEngine)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const Logger = getLogger(Subsystem.WorkflowApi) | |
| const Logger = getLogger(Subsystem.ExecutionEngine) |
🤖 Prompt for AI Agents
In server/execution-engine/communication-service.ts around line 7 the logger is
created with the wrong subsystem (Subsystem.WorkflowApi); change it to use
Subsystem.ExecutionEngine so logs are correctly attributed to the execution
engine communication service. Update the getLogger call to pass
Subsystem.ExecutionEngine and ensure the Subsystem import (if present) is not
accidentally altered.
| private async handleManualTrigger(request: { workflowId: string, stepId: string, triggeredBy: string }): Promise<ExecutionResponse> { | ||
| try { | ||
| // Create a mock context object with the required parameters | ||
| const mockContext = { | ||
| req: { | ||
| param: () => ({ workflowId: request.workflowId, stepId: request.stepId }) | ||
| }, | ||
| json: (data: any) => data | ||
| } as any | ||
|
|
||
| const result = await handleManualTrigger(mockContext) | ||
|
|
||
| return { | ||
| success: true, | ||
| data: result | ||
| } | ||
| } catch (error) { | ||
| return { | ||
| success: false, | ||
| error: error instanceof Error ? error.message : "Failed to trigger manual step" | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Brittle mock context bypasses type safety.
Lines 158-164 create a mock context object with as any to call handleManualTrigger, which expects a Hono Context. This is fragile and bypasses TypeScript's type checking.
Consider refactoring handleManualTrigger in server/execution-engine/triggers.ts to extract the business logic into a separate function that doesn't depend on the Hono Context:
// In triggers.ts - add new function
export const triggerManualStep = async (
workflowId: string,
stepId: string,
triggeredBy: string
) => {
// Move lines 17-106 from handleManualTrigger here
// Return the result directly without c.json()
}
// Then handleManualTrigger becomes:
export const handleManualTrigger = async (c: Context) => {
const { workflowId, stepId } = c.req.param()
try {
const result = await triggerManualStep(workflowId, stepId, "manual")
return c.json(result)
} catch (error) {
// error handling
}
}Then in communication-service.ts:
private async handleManualTrigger(request: { workflowId: string, stepId: string, triggeredBy: string }): Promise<ExecutionResponse> {
try {
- // Create a mock context object with the required parameters
- const mockContext = {
- req: {
- param: () => ({ workflowId: request.workflowId, stepId: request.stepId })
- },
- json: (data: any) => data
- } as any
-
- const result = await handleManualTrigger(mockContext)
+ const result = await triggerManualStep(
+ request.workflowId,
+ request.stepId,
+ request.triggeredBy
+ )
return {
success: true,
data: result
}🤖 Prompt for AI Agents
In server/execution-engine/communication-service.ts around lines 156 to 178, the
code builds a brittle mock Hono Context with "as any" to call
handleManualTrigger; instead extract the business logic out of
handleManualTrigger into a new exported, context-free function (e.g.,
triggerManualStep(workflowId: string, stepId: string, triggeredBy: string):
Promise<YourResultType>) in server/execution-engine/triggers.ts, refactor
handleManualTrigger to parse params from the Hono Context and call
triggerManualStep then return c.json(result), and update communication-service
to call the new triggerManualStep directly (passing request.workflowId,
request.stepId, request.triggeredBy) and handle its typed result/error—this
removes the mock context, preserves TypeScript safety, and requires updating
imports and types accordingly.
| async execute( | ||
| input: Record<string, any>, | ||
| config: Record<string, any>, | ||
| workflowContext: WorkflowContext | ||
| ): Promise<ToolExecutionResult> { | ||
| return { | ||
| status: ToolExecutionStatus.COMPLETED, | ||
| output: { | ||
| message: "Gmail task completed", | ||
| data: input, | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
Align execute output with the declared schema.
outputSchema (Lines 48-55) requires to, subject, body, sentAt, and status, but the implementation on Lines 73-77 only returns { message, data }. Any downstream validator consuming the schema will break. Please populate the required fields from config/input and include a sentAt timestamp and status string.
Apply this patch:
- async execute(
- input: Record<string, any>,
- config: Record<string, any>,
- workflowContext: WorkflowContext
- ): Promise<ToolExecutionResult> {
- return {
- status: ToolExecutionStatus.COMPLETED,
- output: {
- message: "Gmail task completed",
- data: input,
- },
- }
- }
+ async execute(
+ input: Record<string, any>,
+ config: Record<string, any>,
+ workflowContext: WorkflowContext
+ ): Promise<ToolExecutionResult> {
+ const to = config.to ?? []
+ const subject = input.subject ?? ""
+ const body = input.body ?? ""
+
+ return {
+ status: ToolExecutionStatus.COMPLETED,
+ output: {
+ messageId: workflowContext.currentToolId,
+ to,
+ subject,
+ body,
+ sentAt: new Date().toISOString(),
+ status: "sent",
+ attachments: input.attachments ?? [],
+ message: "Gmail task completed",
+ },
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async execute( | |
| input: Record<string, any>, | |
| config: Record<string, any>, | |
| workflowContext: WorkflowContext | |
| ): Promise<ToolExecutionResult> { | |
| return { | |
| status: ToolExecutionStatus.COMPLETED, | |
| output: { | |
| message: "Gmail task completed", | |
| data: input, | |
| }, | |
| } | |
| } | |
| async execute( | |
| input: Record<string, any>, | |
| config: Record<string, any>, | |
| workflowContext: WorkflowContext | |
| ): Promise<ToolExecutionResult> { | |
| const to = config.to ?? [] | |
| const subject = input.subject ?? "" | |
| const body = input.body ?? "" | |
| return { | |
| status: ToolExecutionStatus.COMPLETED, | |
| output: { | |
| messageId: workflowContext.currentToolId, | |
| to, | |
| subject, | |
| body, | |
| sentAt: new Date().toISOString(), | |
| status: "sent", | |
| attachments: input.attachments ?? [], | |
| message: "Gmail task completed", | |
| }, | |
| } | |
| } |
🤖 Prompt for AI Agents
In server/workflow-tools/gmail.ts around lines 67 to 79, the execute() return
value currently returns {message, data} but must match the declared outputSchema
(to, subject, body, sentAt, status); update the returned output to populate
to/subject/body from config or input (prefer config if provided, fall back to
input), set sentAt to the current ISO timestamp (new Date().toISOString()),
include a status string such as "sent" (or "failed" on error), and retain any
helpful message or extra data under a separate field; ensure the final shape
includes keys: to, subject, body, sentAt, status (and optional message/data) so
downstream validation succeeds.
| async execute( | ||
| input: Record<string, any>, | ||
| config: Record<string, any>, | ||
| workflowContext: WorkflowContext | ||
| ): Promise<ToolExecutionResult> { | ||
| try { | ||
| // TODO: Implement HTTP request execution logic | ||
| return { | ||
| status: ToolExecutionStatus.COMPLETED, | ||
| output: { | ||
| status: 200, | ||
| data: { message: "HTTP request executed successfully" }, | ||
| duration: 100, | ||
| } | ||
| } | ||
| } catch (error) { | ||
| return { | ||
| status: ToolExecutionStatus.FAILED, | ||
| output: { | ||
| error: error instanceof Error ? error.message : "Unknown error" | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Stub implementation with critical security implications.
The TODO indicates this is a placeholder, but when implemented, this tool will need SSRF (Server-Side Request Forgery) protection to prevent requests to internal/private networks.
Before implementing the HTTP request logic, ensure you:
- Validate and sanitize the URL to block private IP ranges (127.0.0.0/8, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, etc.)
- Implement DNS rebinding protection
- Restrict allowed protocols (http/https only)
- Apply timeout and size limits
- Consider using an allowlist for permitted domains
Would you like me to generate a secure implementation or create a separate issue for this?
🤖 Prompt for AI Agents
In server/workflow-tools/http-request.ts around lines 52 to 75, the execute
method is a stub that lacks any SSRF protections; replace the placeholder with a
secure HTTP request implementation that (1) strictly validates and normalizes
the input URL and rejects non-http(s) schemes, (2) resolves the hostname and
blocks requests to private/internal IP ranges (127.0.0.0/8, 10.0.0.0/8,
172.16.0.0/12, 192.168.0.0/16, link-local, and IPv6 equivalents), (3) defends
against DNS rebinding by resolving DNS separately and ensuring the resolved IP
remains allowed for the entire request, (4) enforces timeouts and response size
limits and parses/validates headers, and (5) optionally enforces an allowlist of
domains and returns clear error states via ToolExecutionStatus.FAILED when
checks fail; implement these checks before performing the network call and
ensure all failures surface a safe error message.
| inputSchema = z.object({}) | ||
| outputSchema = z.object({}) | ||
| configSchema = z.object({}) |
There was a problem hiding this comment.
Empty schemas provide no validation.
All schemas are empty z.object({}), which means no runtime validation. This is particularly important for a merge node that needs to handle multiple inputs.
Consider defining schemas that reflect the merge behavior:
- inputSchema = z.object({})
- outputSchema = z.object({})
- configSchema = z.object({})
+ inputSchema = z.object({
+ // Schema should accommodate multiple input shapes
+ })
+
+ outputSchema = z.object({
+ merged: z.record(z.string(), z.any()),
+ sources: z.array(z.string()).optional()
+ })
+
+ configSchema = z.object({
+ inputCount: z.number().min(1).max(5).default(1),
+ mergeStrategy: z.enum(["merge", "concat", "latest"]).default("merge"),
+ waitForAll: z.boolean().default(true),
+ timeout: z.number().positive().default(300000)
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| inputSchema = z.object({}) | |
| outputSchema = z.object({}) | |
| configSchema = z.object({}) | |
| inputSchema = z.object({ | |
| // Schema should accommodate multiple input shapes | |
| }) | |
| outputSchema = z.object({ | |
| merged: z.record(z.string(), z.any()), | |
| sources: z.array(z.string()).optional() | |
| }) | |
| configSchema = z.object({ | |
| inputCount: z.number().min(1).max(5).default(1), | |
| mergeStrategy: z.enum(["merge", "concat", "latest"]).default("merge"), | |
| waitForAll: z.boolean().default(true), | |
| timeout: z.number().positive().default(300000) | |
| }) |
🤖 Prompt for AI Agents
In server/workflow-tools/merged-node.ts around lines 37 to 39 the three schemas
are empty (inputSchema = z.object({}), outputSchema = z.object({}), configSchema
= z.object({})), which provides no runtime validation; replace them with
concrete schemas that reflect the merge node behavior — for inputSchema define
the expected shape for multiple upstream payloads (e.g., a record keyed by input
name or an array of objects with required fields) and validate types for the
data you will merge, for outputSchema define the merged result shape (required
keys and value types) and any post-merge invariants, and for configSchema define
merge strategy options (e.g., conflictResolution: z.enum([...]), keysToKeep:
z.array(z.string()).optional()) ; update any derived TypeScript types to use
z.infer<typeof ...>, add minimal unit tests for valid/invalid inputs and ensure
calling code adapts to the new schema errors (throw/return) so the node fails
fast on invalid data.
| defaultConfig = { | ||
| inputCount: 1, | ||
| outputCount: 1, | ||
| options: { | ||
| channel: { | ||
| type: "string", | ||
| default: "", | ||
| optional: false | ||
| }, | ||
| username: { | ||
| type: "string", | ||
| default: "Workflow Bot", | ||
| optional: true | ||
| }, | ||
| iconEmoji: { | ||
| type: "string", | ||
| default: ":robot_face:", | ||
| optional: true | ||
| }, | ||
| threadTs: { | ||
| type: "string", | ||
| default: "", | ||
| optional: true | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing authentication configuration.
The Slack tool lacks authentication fields (bot token or webhook URL) in its defaultConfig. Without these, the tool cannot connect to Slack when implemented.
Add authentication to the defaultConfig:
defaultConfig = {
inputCount: 1,
outputCount: 1,
options: {
+ token: {
+ type: "string",
+ default: "",
+ optional: false
+ },
+ // OR for webhook-based approach:
+ // webhookUrl: {
+ // type: "string",
+ // default: "",
+ // optional: false
+ // },
channel: {
type: "string",
default: "",
optional: false
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| defaultConfig = { | |
| inputCount: 1, | |
| outputCount: 1, | |
| options: { | |
| channel: { | |
| type: "string", | |
| default: "", | |
| optional: false | |
| }, | |
| username: { | |
| type: "string", | |
| default: "Workflow Bot", | |
| optional: true | |
| }, | |
| iconEmoji: { | |
| type: "string", | |
| default: ":robot_face:", | |
| optional: true | |
| }, | |
| threadTs: { | |
| type: "string", | |
| default: "", | |
| optional: true | |
| } | |
| } | |
| } | |
| defaultConfig = { | |
| inputCount: 1, | |
| outputCount: 1, | |
| options: { | |
| token: { | |
| type: "string", | |
| default: "", | |
| optional: false | |
| }, | |
| // OR for webhook-based approach: | |
| // webhookUrl: { | |
| // type: "string", | |
| // default: "", | |
| // optional: false | |
| // }, | |
| channel: { | |
| type: "string", | |
| default: "", | |
| optional: false | |
| }, | |
| username: { | |
| type: "string", | |
| default: "Workflow Bot", | |
| optional: true | |
| }, | |
| iconEmoji: { | |
| type: "string", | |
| default: ":robot_face:", | |
| optional: true | |
| }, | |
| threadTs: { | |
| type: "string", | |
| default: "", | |
| optional: true | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In server/workflow-tools/slack.ts around lines 9 to 34, the defaultConfig is
missing Slack authentication fields; add options for botToken and webhookUrl
(both type "string", default "", optional true) to defaultConfig, document that
botToken takes precedence over webhookUrl, and update any validation to error at
runtime if neither is provided so callers must supply at least one; do not
hardcode secrets—prefer reading from environment when omitted and mark these
fields as sensitive in any config metadata.
| } | ||
|
|
||
| // Get template with access validation | ||
| private async getTemplate(templateId: string, workspaceId: number) { |
There was a problem hiding this comment.
Do we need all this db helper functions like getTemplate createWorkflowExecution since these are already defined in the /db/workflow.ts file? Also this getTemplate is supposed to do access validation but its not checking for user access to the template
There was a problem hiding this comment.
i will connect with you on how this access validation is working, i need to implement that
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (11)
server/execution-engine/triggers.ts (2)
45-55: Scope the pending tool lookup to this step.Line 50 still filters only by
workflowExecutionIdandstatus, so any other WAITING tool execution in the same workflow can be marked COMPLETE, dequeuing the wrong branch. Please guard the lookup with this step’stoolExecIds, and short-circuit when the list is empty.+ const toolExecIds = step.toolExecIds ?? [] + if (toolExecIds.length === 0) { + Logger.warn(`Step ${stepId} has no pending tool executions`) + return c.json({ + error: "No pending tool execution found" + }, 404) + } + const [toolExec] = await db .select() .from(toolExecution) .where( and( eq(toolExecution.workflowExecutionId, workflowId), - eq(toolExecution.status, ToolExecutionStatus.AWAITING_USER_INPUT) + eq(toolExecution.status, ToolExecutionStatus.AWAITING_USER_INPUT), + sql`${toolExecution.id} = ANY(${sql.array(toolExecIds, "uuid")})` ) ) .limit(1)
135-147: Restrict template triggers to root steps again.Lines 139-143 now rely solely on
toolCategory, so non-root trigger steps fire on every state change. That revives the bug we discussed earlier—downstream triggers become active without their predecessors. Please restore theprevStepIdsnull/empty filter.const triggerSteps = await db .select() .from(workflowStepTemplate) .where( and( eq(workflowStepTemplate.workflowTemplateId, template.id), - eq(workflowStepTemplate.toolCategory, ToolCategory.TRIGGER), + eq(workflowStepTemplate.toolCategory, ToolCategory.TRIGGER), + or( + sql`${workflowStepTemplate.prevStepIds} IS NULL`, + sql`array_length(${workflowStepTemplate.prevStepIds}, 1) = 0` + ) ) )server/api/workflow-execution.ts (1)
115-133: Authenticate manual trigger requests.Line 120 logs the manual trigger without validating the caller, leaving this endpoint open to anyone who can hit it. We require every workflow API to authenticate first—please await
getUserFromJWT(and fail fast on errors) before dispatching the manual trigger. Based on learningsexport const HandleManualTrigger = async (c: Context) => { const { workflowId, stepId } = c.req.param() try { + await getUserFromJWT(db, c.get(JwtPayloadKey)) Logger.info(`🔴 Manual trigger requested for workflow ${workflowId}, step ${stepId}`) // Use execution client to trigger the step (same pattern as ExecuteTemplateHandler) const result = await executionClient.triggerManualStep(workflowId, stepId, "manual")server/execution-engine/execution-engine-queue.ts (1)
48-83: Handle single pg-boss job per invocation.
pg-boss.workdelivers a singlejob, so iteratingjobsthrowsTypeError: job is not iterableand the worker never runs. Replace the loop with direct single-job handling.- await executionBoss.work(ExecutionQueue, async (jobs) => { - for (const job of jobs) { - const packet = job.data as ExecutionPacket - try { + await executionBoss.work(ExecutionQueue, async (job) => { + const packet = job.data as ExecutionPacket + try { Logger.info(`🔄 EXECUTION WORKER PICKED UP PACKET:`) Logger.info(` Template ID: ${packet.template_id}`) Logger.info(` Workflow ID: ${packet.workflow_id}`) Logger.info(` Step ID: ${packet.step_id}`) Logger.info(` Previous Tool ID: ${packet.previous_tool_id}`) Logger.info(` Input: ${JSON.stringify(packet.input)}`) Logger.info(` Job ID: ${job.id}`) // Execute the step using StepExecutor const result = await stepExecutor.executeStep(packet) // Log execution result Logger.info(`📋 Step execution result: ${JSON.stringify(result)}`) // Queue next steps if execution should continue if (result.nextAction === 'continue') { await queueNextSteps(packet, result) } else if (result.nextAction === 'wait_for_input') { Logger.info(`⏳ Step is waiting for additional inputs, not queuing next steps`) } Logger.info(`✅ Packet processing completed for job ${job.id}`) - } catch (error) { - Logger.error(error, `❌ Error processing execution packet for job ${job.id}`) - markWorkflowStatus(packet.workflow_id,false).catch((err) => { - Logger.error(err, `Failed to mark workflow ${packet.workflow_id} as FAILED after execution error`) - }) - } - } + } catch (error) { + Logger.error(error, `❌ Error processing execution packet for job ${job.id}`) + markWorkflowStatus(packet.workflow_id,false).catch((err) => { + Logger.error(err, `Failed to mark workflow ${packet.workflow_id} as FAILED after execution error`) + }) + } })server/execution-engine/communication-service.ts (2)
7-7: Incorrect logger subsystem (previously flagged).The logger is initialized with
Subsystem.WorkflowApi, but this is the execution engine communication service. Based on the module's purpose and sibling files (e.g., server/execution-engine/server.ts usesSubsystem.ExecutionEngine), the correct subsystem should beSubsystem.ExecutionEngine.Apply this diff:
-const Logger = getLogger(Subsystem.WorkflowApi) +const Logger = getLogger(Subsystem.ExecutionEngine)Based on past review comments.
135-157: Brittle mock context bypasses type safety (previously flagged).Lines 138-143 create a mock context object with
as anyto callhandleManualTrigger, which expects a Hono Context. This is fragile and bypasses TypeScript's type checking. The recommended approach from previous reviews is to extract the business logic fromhandleManualTriggerinto a separate context-free function that can be called directly by both this service and the Hono API handler.Consider refactoring as suggested in previous reviews: extract a
triggerManualStep(workflowId, stepId, triggeredBy)function in server/execution-engine/triggers.ts that contains the core logic, then call it directly without the mock context.Based on past review comments.
server/execution-engine/workflow-executor.ts (2)
224-232: Missing parent step linkage in execution graph (previously flagged).
createStepExecutionsupdatesprevStepIdsandnextStepIdsbut never setsparentStepIdon the created step execution rows. This means the parent-child hierarchy from the template is not preserved in the execution records. Any downstream logic that relies on the execution graph parent relationships (e.g., traversing up the tree, computing breadcrumbs) will lose the parent linkage even though the template modeled it.Apply this diff to map template parent IDs to execution IDs:
// Update the step execution with relationships and metadata + const parentStepExecId = + templateStep.parentStepId + ? stepExecutions.get(templateStep.parentStepId) ?? null + : null + await db .update(workflowStepExecution) .set({ + parentStepId: parentStepExecId, prevStepIds: prevStepExecIds, nextStepIds: nextStepExecIds, metadata: updatedMetadata, }) .where(eq(workflowStepExecution.id, stepExecId))Based on past review comments.
73-86: Incomplete access validation in getTemplate.The
getTemplatemethod checks that the template belongs to the correct workspace (line 80) but doesn't verify user-specific permissions. Per past review comments, this method is supposed to perform access validation, but it's not checking whether the requesting user has permission to access the template (viauserWorkflowPermissionsorisPublicchecks).Consider using the existing
getWorkflowTemplateByIdWithPermissionCheckhelper from server/db/workflow.ts (lines 67-108 of relevant snippets) instead of this custom query, or implement similar permission logic here.Based on past review comments.
server/execution-engine/step-executor.ts (2)
108-111: RUNNING steps incorrectly marked as COMPLETED (previously flagged).When a tool reports
ToolExecutionStatus.RUNNING, the code immediately returnsnextAction: 'continue'and persists the step asWorkflowStatus.COMPLETED(line 110). This prematurely advances the workflow while the tool is still executing, guaranteeing downstream steps consume incomplete data and making it impossible to resume correctly.Apply this diff to keep the step in a non-terminal state:
case ToolExecutionStatus.RUNNING: - nextAction = 'continue' - stepStatus = WorkflowStatus.COMPLETED + nextAction = 'wait_for_input' + stepStatus = WorkflowStatus.ACTIVE breakBased on past review comments.
397-426: Partial inputs not cleared after combining results (previously flagged).Once all required inputs arrive and
combinedInputis built (lines 398-419), the code never clearspartialInputsfrom metadata. On the next execution cycle, those stale entries make the step believe every route is already satisfied, causing it to reuse old tool execution IDs and feed last-run outputs forward. This corrupts multi-input workflows after the first run.After building
combinedInputand before returningshouldExecute: true, reset the consumed partial input state within the same transaction. Update the step metadata to clearpartialInputsto an empty object.Based on past review comments.
server/api/workflow-template.ts (1)
287-297: Fragile connection reconstruction using non-unique names (previously flagged).Reconstructing connections using step names (e.g.,
${step.name}-${nextStep.name}on line 293) is fragile because step names are not guaranteed to be unique within a workflow template. If two steps have the same name, this logic will lead to incorrect connections being created or ambiguous connection identification.Use step IDs instead, which are guaranteed to be unique. The step already has
nextStepIdsarray with UUIDs, so you could reconstruct connections directly from those IDs without name-based lookups.Based on past review comments.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
server/api/workflow-execution.ts(1 hunks)server/api/workflow-template-step.ts(1 hunks)server/api/workflow-template.ts(1 hunks)server/db/schema/workflows.ts(8 hunks)server/db/workflow.ts(2 hunks)server/execution-engine/communication-service.ts(1 hunks)server/execution-engine/execution-client.ts(1 hunks)server/execution-engine/execution-engine-queue.ts(1 hunks)server/execution-engine/message-queue.ts(1 hunks)server/execution-engine/step-executor.ts(1 hunks)server/execution-engine/triggers.ts(1 hunks)server/execution-engine/types.ts(1 hunks)server/execution-engine/workflow-executor.ts(1 hunks)server/package.json(2 hunks)server/server.ts(4 hunks)server/workflow-tools/agent.ts(1 hunks)server/workflow-tools/ai-agent.ts(1 hunks)server/workflow-tools/delay.ts(1 hunks)server/workflow-tools/manual-trigger.ts(1 hunks)server/workflow-tools/merged-node.ts(1 hunks)server/workflow-tools/registry.ts(1 hunks)server/workflow-tools/scheduler-trigger.ts(1 hunks)server/workflow-tools/slack.ts(1 hunks)server/workflow-tools/switch.ts(1 hunks)server/workflow-tools/types.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
- server/workflow-tools/agent.ts
- server/workflow-tools/slack.ts
- server/workflow-tools/ai-agent.ts
- server/workflow-tools/merged-node.ts
- server/api/workflow-template-step.ts
- server/workflow-tools/switch.ts
- server/workflow-tools/manual-trigger.ts
- server/workflow-tools/types.ts
- server/package.json
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-10-17T08:54:59.756Z
Learnt from: debajyoti0606
Repo: xynehq/xyne PR: 1075
File: server/workflowScriptExecutorTool.ts:36-77
Timestamp: 2025-10-17T08:54:59.756Z
Learning: In server/workflowScriptExecutorTool.ts, the validateJson function's escape sequence normalization logic (lines 41-52 that replace \n, \r, \t, \f, \b, \", and \\ before parsing) is intentionally designed to handle edge cases and should not be flagged as potentially corrupting valid JSON. The double-escaping behavior is expected.
Applied to files:
server/api/workflow-execution.ts
📚 Learning: 2025-09-12T13:28:43.782Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 821
File: server/server.ts:771-837
Timestamp: 2025-09-12T13:28:43.782Z
Learning: All workflow API endpoints should be protected with AuthMiddleware to prevent unauthorized access to template creation, execution, tool management, and other sensitive workflow operations.
Applied to files:
server/api/workflow-execution.tsserver/api/workflow-template.tsserver/server.ts
📚 Learning: 2025-06-16T11:56:22.752Z
Learnt from: oindrila-b
Repo: xynehq/xyne PR: 545
File: server/integrations/google/index.ts:137-145
Timestamp: 2025-06-16T11:56:22.752Z
Learning: In server/integrations/google/index.ts, both Logger (base logger) and loggerWithChild (factory for email-scoped child loggers) are intentionally maintained to handle different logging scenarios - one for when email context is not available and one for when it is available.
Applied to files:
server/execution-engine/communication-service.ts
📚 Learning: 2025-09-02T16:41:31.729Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowTools.ts:9-14
Timestamp: 2025-09-02T16:41:31.729Z
Learning: In the workflowTools.ts schema, the user prefers to add notNull() constraint to the type field but does not want onDelete cascade behavior for the workflowTemplateId foreign key reference.
Applied to files:
server/db/workflow.tsserver/db/schema/workflows.ts
📚 Learning: 2025-10-28T07:51:03.144Z
Learnt from: Sithaarth24
Repo: xynehq/xyne PR: 1127
File: server/db/schema/workflows.ts:320-320
Timestamp: 2025-10-28T07:51:03.144Z
Learning: In the workflow template architecture, the API layer handles conversion between external UUIDs (used by frontend) and internal integer IDs (used in database). Frontend types expect UUID strings for workflow template IDs, while internal schemas like createWorkflowStepTemplateSchema expect integers. The publicWorkflowTemplateSchema maps external_id → id for API responses.
Applied to files:
server/db/workflow.tsserver/db/schema/workflows.tsserver/api/workflow-template.tsserver/server.ts
📚 Learning: 2025-09-04T09:29:29.879Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowServiceConfig.ts:51-53
Timestamp: 2025-09-04T09:29:29.879Z
Learning: In the workflowServiceConfig.ts schema, the user prefers to keep currentWorkflowExeId as a simple uuid reference without adding foreign key constraints or onDelete behaviors to the workflowExe table.
Applied to files:
server/db/schema/workflows.tsserver/execution-engine/workflow-executor.ts
📚 Learning: 2025-05-28T10:47:41.020Z
Learnt from: naSim087
Repo: xynehq/xyne PR: 484
File: server/integrations/google/sync.ts:222-222
Timestamp: 2025-05-28T10:47:41.020Z
Learning: The functions `handleGoogleDriveChange` and `getDriveChanges` in `server/integrations/google/sync.ts` are intentionally exported for future changes, even though they are not currently being imported by other modules.
Applied to files:
server/server.ts
📚 Learning: 2025-09-02T16:37:08.246Z
Learnt from: devesh-juspay
Repo: xynehq/xyne PR: 758
File: server/db/schema/workflowExecution.ts:29-29
Timestamp: 2025-09-02T16:37:08.246Z
Learning: In the Xyne workflow system schema, cascade delete behavior should not be added to the foreign key relationship between workflowStepExe.workflowExeId and workflowExe.id. Manual control over step deletion is preferred over automatic cascade behavior.
Applied to files:
server/execution-engine/workflow-executor.ts
🧬 Code graph analysis (15)
server/workflow-tools/delay.ts (1)
server/workflow-tools/types.ts (3)
WorkflowTool(23-30)WorkflowContext(5-9)ToolExecutionResult(12-18)
server/execution-engine/message-queue.ts (1)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)
server/api/workflow-execution.ts (5)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/db/user.ts (1)
getUserFromJWT(488-500)server/db/workflow.ts (1)
getWorkflowTemplateByIdWithPermissionCheck(68-109)server/execution-engine/execution-client.ts (1)
executionClient(119-119)server/db/schema/workflows.ts (6)
workflowExecution(153-178)workflowTemplate(71-98)workflowStepExecution(181-210)workflowStepTemplate(101-128)toolExecution(213-230)workflowTool(131-150)
server/execution-engine/types.ts (1)
server/workflow-tools/types.ts (1)
ToolExecutionResult(12-18)
server/execution-engine/triggers.ts (7)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/communication-service.ts (1)
handleManualTrigger(135-157)server/db/schema/workflows.ts (8)
workflowStepExecution(181-210)WorkflowStatus(481-481)toolExecution(213-230)ToolExecutionStatus(485-485)SelectWorkflowTemplate(256-258)workflowStepTemplate(101-128)ToolCategory(484-484)ToolType(483-483)server/execution-engine/types.ts (1)
ExecutionPacket(70-77)server/execution-engine/execution-engine-queue.ts (1)
queueNextSteps(149-230)server/workflow-tools/registry.ts (1)
getTool(39-45)server/workflow-tools/types.ts (1)
WorkflowTool(23-30)
server/execution-engine/execution-client.ts (2)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/message-queue.ts (2)
ExecutionRequest(17-22)messageQueue(289-289)
server/execution-engine/communication-service.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/message-queue.ts (4)
ExecutionMessage(10-15)ExecutionResponse(24-29)messageQueue(289-289)ExecutionRequest(17-22)server/execution-engine/workflow-executor.ts (1)
workflowExecutor(312-312)server/execution-engine/triggers.ts (1)
handleManualTrigger(18-127)
server/workflow-tools/registry.ts (1)
server/workflow-tools/types.ts (2)
ToolRegistry(33-35)defaultToolConfig(20-20)
server/workflow-tools/scheduler-trigger.ts (3)
server/workflow-tools/types.ts (4)
WorkflowTool(23-30)defaultToolConfig(20-20)WorkflowContext(5-9)ToolExecutionResult(12-18)server/db/schema/workflows.ts (4)
ToolType(483-483)ToolCategory(484-484)ToolExecutionStatus(485-485)SelectWorkflowTemplate(256-258)server/execution-engine/message-queue.ts (2)
ExecutionRequest(17-22)messageQueue(289-289)
server/execution-engine/execution-engine-queue.ts (3)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/types.ts (2)
ExecutionPacket(70-77)StepExecutionResult(80-88)server/db/schema/workflows.ts (3)
workflowExecution(153-178)WorkflowStatus(481-481)workflowStepExecution(181-210)
server/db/workflow.ts (2)
server/db/schema/workflows.ts (3)
workflowTemplate(71-98)selectWorkflowTemplateSchema(233-233)workflowStepTemplate(101-128)server/types.ts (1)
TxnOrClient(340-340)
server/execution-engine/step-executor.ts (5)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/execution-engine/types.ts (2)
ExecutionPacket(70-77)StepExecutionResult(80-88)server/db/schema/workflows.ts (6)
ToolExecutionStatus(485-485)ToolType(483-483)workflowStepExecution(181-210)WorkflowStatus(481-481)toolExecution(213-230)SelectWorkflowStepExecution(266-268)server/workflow-tools/types.ts (2)
WorkflowContext(5-9)ToolExecutionResult(12-18)server/workflow-tools/registry.ts (1)
getTool(39-45)
server/api/workflow-template.ts (8)
server/logger/index.ts (3)
getLoggerWithChild(192-200)Subsystem(15-15)getLogger(36-93)server/workflow-tools/registry.ts (3)
getSupportedToolTypes(53-55)getToolCategory(59-62)getTool(39-45)server/db/schema/workflows.ts (4)
ToolType(483-483)workflowTemplate(71-98)workflowStepTemplate(101-128)SelectWorkflowTemplate(256-258)server/db/user.ts (1)
getUserFromJWT(488-500)server/db/schema/userWorkflowPermissions.ts (1)
insertUserWorkflowPermissionSchema(56-62)server/db/userWorkflowPermissions.ts (1)
grantUserWorkflowPermission(41-57)server/db/workflow.ts (1)
getWorkflowTemplateByIdWithPermissionCheck(68-109)server/execution-engine/triggers.ts (1)
handleTemplateStateChange(130-206)
server/server.ts (4)
server/api/workflow-template.ts (8)
GetAvailableToolTypesApi(34-59)GetToolTypeSchemaApi(62-88)createTemplateSchema(104-109)CreateTemplateApi(166-213)GetTemplateApi(269-342)UpdateTemplateApi(216-266)DeleteTemplateApi(345-385)HandleStateChangeTemplateApi(387-459)server/api/workflow-template-step.ts (8)
addTemplateStepSchema(27-48)AddTemplateStepApiHandler(90-280)updateTemplateStepSchema(50-71)UpdateTemplateStepApiHandler(283-501)deleteTemplateStepSchema(73-75)DeleteTemplateStepApiHandler(503-663)deleteLinkSchema(77-87)deleteLink(666-810)server/api/workflow-execution.ts (5)
ExecuteTemplateHandler(25-58)GetWorkflowExecutionDetailsApi(144-347)StopExecutionApi(62-88)GetEngineHealthApi(91-112)HandleManualTrigger(115-141)server/execution-engine/message-queue.ts (1)
messageQueue(289-289)
server/execution-engine/workflow-executor.ts (4)
server/logger/index.ts (2)
getLogger(36-93)Subsystem(15-15)server/db/schema/workflows.ts (6)
workflowExecution(153-178)WorkflowStatus(481-481)workflowTemplate(71-98)workflowStepTemplate(101-128)workflowStepExecution(181-210)ToolCategory(484-484)server/execution-engine/types.ts (1)
ExecutionPacket(70-77)server/execution-engine/execution-engine-queue.ts (1)
sendExecutionPacket(93-118)
🔇 Additional comments (2)
server/db/schema/workflows.ts (1)
215-217: Verify optional workflow tool ID is intentional.The
workflowToolIdfield on thetoolExecutiontable has been made optional (lines 216-217 comment out the.notNull()and.references()). This means tool executions can now exist without a reference to a workflow tool. Ensure this is intentional and aligns with your execution model where tools might be defined inline on steps rather than as separate tool entities.If this change supports inline tool definitions (via
toolCategory,toolType,toolConfigon steps), please confirm that all execution paths handle the null case correctly.server/server.ts (1)
2422-2424: LGTM! Clean message queue initialization.The dynamic import and await pattern for message queue initialization is well-structured and ensures the queue is properly set up before the server starts processing requests.
| const [teamplateStep] = await trx | ||
| .select({ | ||
| workflowTemplateId: workflowStepTemplate.workflowTemplateId, | ||
| }) | ||
| .from(workflowStepTemplate) | ||
| .where(eq(workflowStepTemplate.id, templateStepId)) | ||
| .limit(1) | ||
| const template = await getWorkflowTemplateByIdWithPermissionCheck( | ||
| trx, | ||
| teamplateStep.workflowTemplateId, | ||
| workspaceId, | ||
| userId | ||
| ) | ||
| if (!template) { | ||
| return false | ||
| } | ||
| return true | ||
| } |
There was a problem hiding this comment.
Guard against missing template step before dereferencing.
If no workflow step template exists for the supplied templateStepId, teamplateStep is undefined and teamplateStep.workflowTemplateId on Line 127 throws, turning an access check into a 500 instead of a clean “no access” result. Please bail out when the lookup returns nothing:
- const [teamplateStep] = await trx
+ const [templateStep] = await trx
.select({
workflowTemplateId: workflowStepTemplate.workflowTemplateId,
})
.from(workflowStepTemplate)
.where(eq(workflowStepTemplate.id, templateStepId))
.limit(1)
- const template = await getWorkflowTemplateByIdWithPermissionCheck(
- trx,
- teamplateStep.workflowTemplateId,
- workspaceId,
- userId
- )
+ if (!templateStep) {
+ return false
+ }
+
+ const template = await getWorkflowTemplateByIdWithPermissionCheck(
+ trx,
+ templateStep.workflowTemplateId,
+ workspaceId,
+ userId
+ )This preserves the intended boolean contract and prevents an avoidable runtime crash.
🤖 Prompt for AI Agents
In server/db/workflow.ts around lines 118 to 135, the code dereferences
teamplateStep.workflowTemplateId without checking whether teamplateStep is
defined; add a guard immediately after the DB select (e.g. if (!teamplateStep)
return false) so the function returns false when no template step is found
instead of throwing, then proceed to call
getWorkflowTemplateByIdWithPermissionCheck only when teamplateStep exists.
| // Use boss.schedule for cron expressions | ||
| await this.boss.schedule(this.INCOMING_QUEUE, cron, message, { | ||
| tz: 'UTC', | ||
| key : request.templateId | ||
| }) | ||
| Logger.info(`Scheduled execution with cron '${cron}' and correlation ID: ${correlationId}`) | ||
| } else if (start_at) { | ||
| // Use boss.sendAfter for specific start time | ||
| await this.boss.sendAfter(this.INCOMING_QUEUE, message, {}, start_at) | ||
| Logger.info(`Scheduled execution for ${start_at} with correlation ID: ${correlationId}`) | ||
| } else { | ||
| // Use boss.send for immediate execution | ||
| await this.boss.send(this.INCOMING_QUEUE, message) | ||
| Logger.info(`Published immediate execution message with correlation ID: ${correlationId}`) | ||
| } | ||
|
|
||
| return correlationId | ||
| } | ||
|
|
||
| async schedule(request: ExecutionRequest, cron: string): Promise<string> { | ||
| return this.publishExecution(request, undefined, cron) | ||
| } | ||
|
|
||
| async unschedule(templateId: string): Promise<void> { | ||
| if (!this.checkInitialized()) { | ||
| throw new Error("Message queue not initialized") | ||
| } | ||
|
|
||
| await this.boss.unschedule(this.INCOMING_QUEUE, templateId) | ||
| Logger.info(`Unscheduled executions for template ID: ${templateId}`) |
There was a problem hiding this comment.
🧩 Analysis chain
Unique cron schedules are overwritten and can’t be unscheduled per template.
Line 162 reuses the same queue name for every boss.schedule call, but per pg-boss scheduling docs (schedule(name, cron, ...) will update an existing schedule when the name matches), so activating a second template replaces the first instead of creating a distinct cron job. On Line 188 we then call boss.unschedule(this.INCOMING_QUEUE, templateId), yet unschedule(name) only looks at the schedule name; the extra templateId is ignored, meaning this deletes whichever singleton schedule exists for that queue and still can’t target a specific template. Net effect: only one cron template can exist at a time, and unscheduling the wrong template will silently drop the remaining one.
Please generate a per-template schedule identifier (e.g. ${this.INCOMING_QUEUE}:${request.templateId}) and use the same name for both schedule and unschedule. Also drop the unused key option so the intent is unambiguous. For example:
- await this.boss.schedule(this.INCOMING_QUEUE, cron, message, {
- tz: 'UTC',
- key : request.templateId
- })
+ const scheduleName = `${this.INCOMING_QUEUE}:${request.templateId}`
+ await this.boss.schedule(scheduleName, cron, message, { tz: 'UTC' })
...
- await this.boss.unschedule(this.INCOMING_QUEUE, templateId)
+ const scheduleName = `${this.INCOMING_QUEUE}:${templateId}`
+ await this.boss.unschedule(scheduleName)Without this change, cron-triggered workflows will either clobber each other or disappear unexpectedly.
I'll verify the review comment by examining the actual code and checking the pg-boss library documentation.
🏁 Script executed:
#!/bin/bash
# Find and examine the message-queue.ts file
find . -name "message-queue.ts" -type f 2>/dev/null | head -20Length of output: 42
🏁 Script executed:
#!/bin/bash
# Check the actual implementation around the lines mentioned
if [ -f "server/execution-engine/message-queue.ts" ]; then
sed -n '150,200p' "server/execution-engine/message-queue.ts"
fiLength of output: 1694
🏁 Script executed:
#!/bin/bash
# Search for pg-boss imports and version
rg -n "pg-boss|pgboss" --type=ts --type=json -B2 -A2Length of output: 9389
🏁 Script executed:
#!/bin/bash
# Check package.json for pg-boss version
if [ -f "package.json" ]; then
jq '.dependencies, .devDependencies' package.json | grep -A5 -B5 "pg-boss"
fiLength of output: 0
🌐 Web query:
What is the pg-boss schedule() and unschedule() method behavior? Does schedule() overwrite if same name is used? Does unschedule() accept a templateId parameter?
💡 Result:
Short answer:
-
pg-boss schedule(name, cron, data, options) registers a named schedule; if a schedule with the same name already exists it is updated/replaced (i.e. it overwrites the existing schedule). [1][2]
-
unschedule(...) for pg-boss takes the schedule name (queue/name) to remove the schedule. It does not accept a templateId parameter. Use the name returned/used when scheduling. [1][2]
Sources: pg-boss API/docs and changelog. [1] (gist.github.com)
Citations:
🌐 Web query:
pg-boss schedule method documentation parameters
💡 Result:
Summary — pg-boss schedule(name, cron, data, options)
- signature: schedule(name, cron, data, options). [1]
- name (string) — required: the job/queue name to schedule. [1]
- cron (string) — required: cron expression (normally 5-field minute precision; pg-boss checks schedules periodically). [1][3]
- data (object) — optional: payload that will be passed to the created job. [1]
- options (object) — optional: accepts the same options as send() (e.g., retention*, startAfter/startIn, singletonKey/singletonSeconds/… and other job/send-level controls) plus an optional tz property to set timezone (defaults to UTC). [1][4]
Notes
- If a schedule already exists for the given name it is updated. [1]
- Scheduling relies on at least one running instance; scheduling checks are debounced (minute-level behavior noted in changelog/issues). [1][3]
Sources
[1] pg-boss scheduling docs (schedule API).
[2] pg-boss GitHub repository (project README).
[3] pg-boss changelog / releases (cron behavior).
[4] pg-boss send/options reference (examples of send() options).
Unique cron schedules are overwritten and can't be unscheduled per template.
The pg-boss schedule() method overwrites existing schedules when the same name is used, and unschedule() accepts only the schedule name parameter and does not use templateId. Your code reuses this.INCOMING_QUEUE for all template schedules (line 162), causing subsequent scheduling calls to replace prior ones. When unscheduling on line 188, the templateId parameter is ignored by pg-boss, so unscheduling any template silently removes whichever singleton schedule exists regardless of which template is actually targeted.
Generate per-template schedule names using ${this.INCOMING_QUEUE}:${request.templateId} for both schedule() and unschedule() calls, and remove the unused key option:
- await this.boss.schedule(this.INCOMING_QUEUE, cron, message, {
- tz: 'UTC',
- key : request.templateId
- })
+ const scheduleName = `${this.INCOMING_QUEUE}:${request.templateId}`
+ await this.boss.schedule(scheduleName, cron, message, { tz: 'UTC' })
...
- await this.boss.unschedule(this.INCOMING_QUEUE, templateId)
+ const scheduleName = `${this.INCOMING_QUEUE}:${templateId}`
+ await this.boss.unschedule(scheduleName)Without this fix, only one cron-triggered workflow template can exist at a time, and unscheduling will silently clobber all others.
🤖 Prompt for AI Agents
In server/execution-engine/message-queue.ts around lines 160 to 189, the code
uses a single schedule name this.INCOMING_QUEUE for all cron schedules and
passes a redundant key option, which causes pg-boss to overwrite schedules and
makes unschedule(templateId) ineffective; change the schedule and unschedule
calls to use a per-template schedule name like
`${this.INCOMING_QUEUE}:${request.templateId}` (construct the name before
calling boss.schedule and boss.unschedule), remove the unused key option from
the boss.schedule call, and in unschedule(templateId) build the same
per-template schedule name and pass it to boss.unschedule so each template has
its own cron schedule that can be individually removed.
| }) | ||
| .where(eq(workflowStepExecution.id, stepId)) | ||
|
|
||
| Logger.info(`📊 Updated step ${stepId} status to ${status}`) |
There was a problem hiding this comment.
Undefined variable reference.
Line 84 logs status in the success message, but there is no status variable defined in the updateFailedStepStatus method scope. This will cause a runtime error. The method only ever sets the status to WorkflowStatus.FAILED.
Apply this diff:
- Logger.info(`📊 Updated step ${stepId} status to ${status}`)
+ Logger.info(`📊 Updated step ${stepId} status to FAILED`)Or define a local variable if status might vary in the future.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Logger.info(`📊 Updated step ${stepId} status to ${status}`) | |
| Logger.info(`📊 Updated step ${stepId} status to FAILED`) |
🤖 Prompt for AI Agents
In server/execution-engine/step-executor.ts around line 84,
updateFailedStepStatus logs a nonexistent variable `status`, causing a runtime
error; replace the reference with the actual status value being set
(WorkflowStatus.FAILED) or define a local const status = WorkflowStatus.FAILED
before the log and use that variable so the log references a defined value and
matches the state change.
|
|
||
| Logger.info(`📊 Updated step ${stepId} status to ${status}`) | ||
| } catch (error) { | ||
| Logger.error(error, `Failed to update step ${stepId} status to ${status}`) |
There was a problem hiding this comment.
Another undefined variable reference in error log.
Line 86 also references the undefined status variable in the error log within the catch block.
Apply this diff:
- Logger.error(error, `Failed to update step ${stepId} status to ${status}`)
+ Logger.error(error, `Failed to update step ${stepId} status to FAILED`)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Logger.error(error, `Failed to update step ${stepId} status to ${status}`) | |
| Logger.error(error, `Failed to update step ${stepId} status to FAILED`) |
🤖 Prompt for AI Agents
In server/execution-engine/step-executor.ts around line 86, the catch block uses
an undefined variable `status` in the Logger.error call; replace that reference
with the actual in-scope status variable (for example the local variable that
holds the intended status such as newStatus, stepStatus, statusToSet, or the
step.status property), or if no such variable exists, remove the status
interpolation and log only the stepId and error; update the Logger.error
invocation to reference the correct variable name so the error message compiles
and shows the intended status.
| import type { ExecutionPacket } from "./types" | ||
| import { ToolCategory } from "@/db/schema/workflows" | ||
|
|
||
| const Logger = getLogger(Subsystem.WorkflowApi) |
There was a problem hiding this comment.
Inconsistent logger subsystem.
The logger uses Subsystem.WorkflowApi but this is part of the execution engine. For consistency with other execution engine modules (e.g., server/execution-engine/step-executor.ts line 10 uses Subsystem.ExecutionEngine), this should also use Subsystem.ExecutionEngine.
Apply this diff:
-const Logger = getLogger(Subsystem.WorkflowApi)
+const Logger = getLogger(Subsystem.ExecutionEngine)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const Logger = getLogger(Subsystem.WorkflowApi) | |
| const Logger = getLogger(Subsystem.ExecutionEngine) |
🤖 Prompt for AI Agents
In server/execution-engine/workflow-executor.ts around line 16, the logger is
created with Subsystem.WorkflowApi which is inconsistent with other
execution-engine modules; update the getLogger call to use
Subsystem.ExecutionEngine instead (e.g., replace Subsystem.WorkflowApi with
Subsystem.ExecutionEngine), ensure the Subsystem symbol is imported/available in
this file, and run linters/tests to confirm no import or naming issues.
| success: true, | ||
| data: { | ||
| executionId, | ||
| status: "STOPPED", |
There was a problem hiding this comment.
will this be stop_requested?Since we cannot stop instantly.
| templateState: workflowTemplate.state, | ||
| }) | ||
| .from(workflowExecution) | ||
| .leftJoin(workflowTemplate, eq(workflowExecution.workflowTemplateId, workflowTemplate.id)) |
There was a problem hiding this comment.
should we avoid this left join. SInce the intermediate table will be very large?
Description
Testing
Additional Notes
Summary by CodeRabbit
New Features
Chores
expireInHourstoexpireInSecondsandmonitorStateIntervalMinutestomonitorIntervalSeconds.