From dd55201ee601af25be0186d9183c48adb3b69d3d Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Tue, 19 Aug 2025 10:20:00 +0100 Subject: [PATCH] refactor(core): Split `WorkflowExecute.runNode` into smaller methods (#17864) --- packages/core/eslint.config.mjs | 2 +- .../workflow-execute-run-node.test.ts | 1077 +++++++++++++++++ .../src/execution-engine/workflow-execute.ts | 476 +++++--- 3 files changed, 1390 insertions(+), 165 deletions(-) create mode 100644 packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts diff --git a/packages/core/eslint.config.mjs b/packages/core/eslint.config.mjs index 5729f09117..d3539964b1 100644 --- a/packages/core/eslint.config.mjs +++ b/packages/core/eslint.config.mjs @@ -3,7 +3,7 @@ import { nodeConfig } from '@n8n/eslint-config/node'; export default defineConfig( nodeConfig, - globalIgnores(['bin/*.js', 'nodes-testing/*.ts']), + globalIgnores(['bin/*.js', 'nodes-testing/*.ts', 'coverage/*']), { rules: { // TODO: Lower the complexity threshold diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts new file mode 100644 index 0000000000..41e4eebed4 --- /dev/null +++ b/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts @@ -0,0 +1,1077 @@ +/** + * Tests for the actual WorkflowExecute.runNode method + * These tests ensure the real implementation behavior is preserved + * during refactoring + * They have been generated by claude code, but have been proven to work + * via test coverage reports and mutation testing. + */ + +// Mock all external dependencies first, before any imports +jest.mock('@n8n/config', () => ({ + GlobalConfig: jest.fn().mockImplementation(() => ({ + sentry: { backendDsn: '' }, + })), +})); + +jest.mock('@n8n/di', () => ({ + Container: { + get: jest.fn(), + }, + Service: () => (target: unknown) => target, +})); + +jest.mock('@/errors/error-reporter', () => ({ + ErrorReporter() { + return { + error: jest.fn(), + }; + }, +})); + +const mockIsJsonCompatible = jest.fn().mockReturnValue({ isValid: true }); +jest.mock('@/utils/is-json-compatible', () => ({ + isJsonCompatible: mockIsJsonCompatible, +})); + +jest.mock('../node-execution-context', () => ({ + ExecuteContext: jest.fn().mockImplementation(() => ({ + hints: [], + })), + PollContext: jest.fn().mockImplementation(() => ({})), +})); + +jest.mock('../triggers-and-pollers', () => ({ + TriggersAndPollers: jest.fn(), +})); + +jest.mock('../routing-node', () => ({ + RoutingNode: jest.fn().mockImplementation(() => ({ + runNode: jest.fn().mockResolvedValue([[{ json: { routed: 'result' } }]]), + })), +})); + +jest.mock('@/node-execute-functions', () => ({ + getExecuteTriggerFunctions: jest.fn(), +})); + +// Now import the real classes +import { GlobalConfig } from '@n8n/config'; +import { Container } from '@n8n/di'; +import { mock } from 'jest-mock-extended'; +import type { + ExecutionBaseError, + IExecuteData, + INode, + INodeType, + IRunExecutionData, + ITaskDataConnections, + IWorkflowExecuteAdditionalData, + Workflow, +} from 'n8n-workflow'; +import { NodeApiError, NodeOperationError, Node } from 'n8n-workflow'; + +import { ExecuteContext, PollContext } from '../node-execution-context'; +import { RoutingNode } from '../routing-node'; +import { TriggersAndPollers } from '../triggers-and-pollers'; +import { WorkflowExecute } from '../workflow-execute'; + +const mockContainer = Container as jest.Mocked; +const mockExecuteContext = ExecuteContext as jest.MockedClass; +const mockPollContext = PollContext as jest.MockedClass; +const mockRoutingNode = RoutingNode as jest.MockedClass; + +describe('WorkflowExecute.runNode - Real Implementation', () => { + let workflowExecute: WorkflowExecute; + let mockWorkflow: jest.Mocked; + let mockAdditionalData: jest.Mocked; + let mockRunExecutionData: IRunExecutionData; + let mockNode: INode; + let mockNodeType: jest.Mocked; + let mockExecutionData: IExecuteData; + + beforeEach(() => { + jest.clearAllMocks(); + + // Setup Container mock for different dependencies + const mockTriggersAndPollersInstance = { + runTrigger: jest.fn(), + }; + const mockGlobalConfigInstance = { + sentry: { backendDsn: '' }, + }; + mockContainer.get.mockImplementation((token) => { + if (token === GlobalConfig) { + return mockGlobalConfigInstance; + } + if (token === TriggersAndPollers) { + return mockTriggersAndPollersInstance; + } + // Default fallback + return mockTriggersAndPollersInstance; + }); + + mockAdditionalData = mock({ + executionId: 'test-execution-id', + }); + + mockRunExecutionData = { + startData: {}, + resultData: { + runData: {}, + pinData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + mockNode = { + id: 'test-node-id', + name: 'Test Node', + type: 'test-node', + typeVersion: 1, + position: [100, 200], + parameters: {}, + }; + + mockNodeType = mock({ + description: { + displayName: 'Test Node', + name: 'test-node', + group: ['transform'], + version: 1, + inputs: ['main'], + outputs: ['main'], + properties: [], + requestDefaults: undefined, // Explicitly set to undefined + }, + }); + + mockWorkflow = mock({ + nodeTypes: { + getByNameAndVersion: jest.fn().mockReturnValue(mockNodeType), + }, + settings: { + executionOrder: 'v1', + }, + }); + + mockExecutionData = { + node: mockNode, + data: { + main: [[{ json: { test: 'data' } }]], + }, + source: null, + }; + + workflowExecute = new WorkflowExecute(mockAdditionalData, 'manual', mockRunExecutionData); + }); + + describe('disabled node handling', () => { + it('should return undefined for disabled node with no input data', async () => { + const disabledNode = { ...mockNode, disabled: true }; + const executionData = { + ...mockExecutionData, + node: disabledNode, + data: { main: [] }, + }; + + const result = await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(result).toEqual({ data: undefined }); + }); + + it('should return undefined for disabled node with null main input', async () => { + const disabledNode = { ...mockNode, disabled: true }; + const executionData = { + ...mockExecutionData, + node: disabledNode, + data: { main: [null] }, + }; + + const result = await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(result).toEqual({ data: undefined }); + }); + + it('should pass through first main input data for disabled node', async () => { + const disabledNode = { ...mockNode, disabled: true }; + const inputData = [{ json: { test: 'passthrough' } }]; + const executionData = { + ...mockExecutionData, + node: disabledNode, + data: { main: [inputData] }, + }; + + const result = await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(result).toEqual({ data: [inputData] }); + }); + }); + + describe('error handling for previously failed nodes', () => { + it('should rethrow NodeOperationError from previous execution', async () => { + const error = new NodeOperationError(mockNode, 'Test error'); + const runDataWithError = { + ...mockRunExecutionData, + resultData: { + ...mockRunExecutionData.resultData, + lastNodeExecuted: mockNode.name, + error, + }, + }; + + await expect( + workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + runDataWithError, + 0, + mockAdditionalData, + 'manual', + ), + ).rejects.toThrow(error); + }); + + it('should rethrow NodeApiError from previous execution', async () => { + const error = new NodeApiError(mockNode, { message: 'API error' }); + const runDataWithError = { + ...mockRunExecutionData, + resultData: { + ...mockRunExecutionData.resultData, + lastNodeExecuted: mockNode.name, + error, + }, + }; + + await expect( + workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + runDataWithError, + 0, + mockAdditionalData, + 'manual', + ), + ).rejects.toThrow(error); + }); + + it('should throw generic Error for other error types from previous execution', async () => { + const originalError = new NodeOperationError(mockNode, 'Generic error'); + const runDataWithError = { + ...mockRunExecutionData, + resultData: { + ...mockRunExecutionData.resultData, + lastNodeExecuted: mockNode.name, + error: originalError, + }, + }; + + await expect( + workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + runDataWithError, + 0, + mockAdditionalData, + 'manual', + ), + ).rejects.toThrow('Generic error'); + }); + + it('should create new Error with message and stack for non-n8n error types from previous execution', async () => { + const originalError = { + name: 'SomeCustomError', + message: 'Custom error message', + stack: 'Custom error stack trace', + } as unknown as ExecutionBaseError; + const runDataWithError = { + ...mockRunExecutionData, + resultData: { + ...mockRunExecutionData.resultData, + lastNodeExecuted: mockNode.name, + error: originalError, + }, + }; + + await expect( + workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + runDataWithError, + 0, + mockAdditionalData, + 'manual', + ), + ).rejects.toThrow(Error); + + // Verify the error has the correct message and stack + try { + await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + runDataWithError, + 0, + mockAdditionalData, + 'manual', + ); + } catch (error) { + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe('Custom error message'); + expect((error as Error).stack).toBe('Custom error stack trace'); + } + }); + }); + + describe('execute node type handling', () => { + it('should execute custom operation when available', async () => { + const mockData = [[{ json: { result: 'custom operation result' } }]]; + const mockCustomOperation = jest.fn().mockResolvedValue(mockData); + + // Create a node with parameters that match the custom operation + const customOpNode = { + ...mockNode, + parameters: { + resource: 'testResource', + operation: 'testOperation', + }, + }; + + // Create a nodeType with customOperations + const customOpNodeType = { + ...mockNodeType, + customOperations: { + testResource: { + testOperation: mockCustomOperation, + }, + }, + execute: undefined, // Make sure execute is not defined so custom operation is used + }; + + mockWorkflow.nodeTypes.getByNameAndVersion = jest.fn().mockReturnValue(customOpNodeType); + + const customOpExecutionData = { + ...mockExecutionData, + node: customOpNode, + }; + + const mockContextInstance = { hints: [] }; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + customOpExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockCustomOperation).toHaveBeenCalledWith(); + expect(result).toEqual({ data: mockData, hints: [] }); + }); + + it('should execute node with execute method and return data with hints', async () => { + const mockData = [[{ json: { result: 'test' } }]]; + const mockHints = [{ message: 'Test hint' }]; + mockNodeType.execute = jest.fn().mockResolvedValue(mockData); + + const mockContextInstance = { + hints: mockHints, + }; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockNodeType.execute).toHaveBeenCalled(); + expect(result).toEqual({ data: mockData, hints: mockHints }); + }); + + it('should execute Node class instance with execute method', async () => { + const mockData = [[{ json: { result: 'test' } }]]; + // Create a mock that extends Node to trigger instanceof Node check + const nodeInstance = Object.create(Node.prototype); + nodeInstance.execute = jest.fn().mockResolvedValue(mockData); + nodeInstance.description = { + displayName: 'Node Instance', + name: 'node-instance', + group: ['transform'], + version: 1, + inputs: ['main'], + outputs: ['main'], + properties: [], + requestDefaults: undefined, + }; + (mockWorkflow.nodeTypes.getByNameAndVersion as jest.Mock).mockReturnValue( + nodeInstance as INodeType, + ); + + const mockContextInstance = { hints: [] }; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(nodeInstance.execute).toHaveBeenCalledWith(mockContextInstance); + expect(result).toEqual({ data: mockData, hints: [] }); + }); + + it('should return undefined when no connection input data for execute nodes', async () => { + const executionData = { + ...mockExecutionData, + data: { main: [] }, // No input data + }; + + mockNodeType.execute = jest.fn(); + + const result = await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(result).toEqual({ data: undefined }); + expect(mockNodeType.execute).not.toHaveBeenCalled(); + }); + + it('should report node execution with invalid JSON data when Sentry is configured', async () => { + // Create data that is not JSON compatible (circular reference) + const circularData: { json: { result: string; circular?: unknown } } = { + json: { result: 'test' }, + }; + circularData.json.circular = circularData; // Create circular reference + const invalidJsonData = [[circularData]]; + + mockNodeType.execute = jest.fn().mockResolvedValue(invalidJsonData); + + // Mock isJsonCompatible to return invalid for this test + mockIsJsonCompatible.mockReturnValueOnce({ + isValid: false, + errorPath: 'json.circular', + errorMessage: 'Circular reference detected', + }); + + // Mock GlobalConfig to have Sentry backend DSN + const mockGlobalConfigInstance = { + sentry: { backendDsn: 'https://test-sentry-dsn' }, + }; + + // Mock ErrorReporter + const mockErrorReporter = { + error: jest.fn(), + }; + + mockContainer.get.mockImplementation((token) => { + if (token === GlobalConfig) { + return mockGlobalConfigInstance; + } + if (token === TriggersAndPollers) { + return { runTrigger: jest.fn() }; + } + // Mock ErrorReporter + return mockErrorReporter; + }); + + const mockContextInstance = { hints: [] }; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + // Verify that ErrorReporter.error was called due to invalid JSON data + expect(mockErrorReporter.error).toHaveBeenCalledWith( + 'node execution returned incorrect data', + expect.objectContaining({ + shouldBeLogged: false, + extra: expect.objectContaining({ + nodeName: mockNode.name, + nodeType: mockNode.type, + nodeVersion: mockNode.typeVersion, + errorPath: 'json.circular', + errorMessage: 'Circular reference detected', + }), + }), + ); + + // Execution should still succeed despite the invalid data + expect(result).toEqual({ data: invalidJsonData, hints: [] }); + }); + + it('should handle close functions and their errors', async () => { + const mockData = [[{ json: { result: 'test' } }]]; + const closeFunction1 = jest.fn().mockResolvedValue(undefined); + const closeFunction2 = jest.fn().mockRejectedValue(new Error('Close error')); + + mockNodeType.execute = jest.fn().mockResolvedValue(mockData); + + const mockContextInstance = { + hints: [], + }; + + // Mock ExecuteContext constructor to capture closeFunctions array + mockExecuteContext.mockImplementation( + ( + _workflow, + _node, + _additionalData, + _mode, + _runExecutionData, + _runIndex, + _connectionInputData, + _inputData, + _executionData, + closeFunctions, + ) => { + // Add close functions to the array passed in + closeFunctions.push(closeFunction1, closeFunction2); + return mockContextInstance as unknown as ExecuteContext; + }, + ); + + await expect( + workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ), + ).rejects.toThrow('Close error'); + + expect(closeFunction1).toHaveBeenCalled(); + expect(closeFunction2).toHaveBeenCalled(); + }); + + it('should throw ApplicationError when close function throws non-Error object', async () => { + const mockData = [[{ json: { result: 'test' } }]]; + const closeFunction1 = jest.fn().mockResolvedValue(undefined); + const closeFunction2 = jest.fn().mockRejectedValue('String error'); // Non-Error object to trigger line 1247 + + mockNodeType.execute = jest.fn().mockResolvedValue(mockData); + + const mockContextInstance = { + hints: [], + }; + + // Mock ExecuteContext constructor to capture closeFunctions array + mockExecuteContext.mockImplementation( + ( + _workflow, + _node, + _additionalData, + _mode, + _runExecutionData, + _runIndex, + _connectionInputData, + _inputData, + _executionData, + closeFunctions, + ) => { + // Add close functions to the array passed in + closeFunctions.push(closeFunction1, closeFunction2); + return mockContextInstance as unknown as ExecuteContext; + }, + ); + + await expect( + workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ), + ).rejects.toThrow("Error on execution node's close function(s)"); + + expect(closeFunction1).toHaveBeenCalled(); + expect(closeFunction2).toHaveBeenCalled(); + }); + }); + + describe('poll node type handling', () => { + it('should execute poll function in manual mode', async () => { + const mockData = [[{ json: { polled: 'data' } }]]; + mockNodeType.poll = jest.fn().mockResolvedValue(mockData); + mockNodeType.execute = undefined; + + const mockContextInstance = {}; + mockPollContext.mockImplementation(() => mockContextInstance as unknown as PollContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockPollContext).toHaveBeenCalledWith( + mockWorkflow, + mockNode, + mockAdditionalData, + 'manual', + 'manual', + ); + expect(mockNodeType.poll).toHaveBeenCalledWith(); + expect(result).toEqual({ data: mockData }); + }); + + it('should pass through input data for poll nodes in non-manual mode', async () => { + mockNodeType.poll = jest.fn(); + mockNodeType.execute = undefined; + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'trigger', // Non-manual mode + ); + + expect(mockNodeType.poll).not.toHaveBeenCalled(); + expect(result).toEqual({ data: mockExecutionData.data.main }); + }); + }); + + describe('trigger node type handling', () => { + it('should run trigger in manual mode and return data', async () => { + const mockTriggerData = [[{ json: { triggered: 'data' } }]]; + const mockTriggerResponse = { + manualTriggerResponse: Promise.resolve(mockTriggerData), + }; + + mockNodeType.trigger = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.webhook = undefined; + + const mockTriggersAndPollersInstance = { + runTrigger: jest.fn().mockResolvedValue(mockTriggerResponse), + }; + const mockGlobalConfigInstance = { + sentry: { backendDsn: '' }, + }; + mockContainer.get.mockImplementation((token) => { + if (token === GlobalConfig) { + return mockGlobalConfigInstance; + } + if (token === TriggersAndPollers) { + return mockTriggersAndPollersInstance; + } + return mockTriggersAndPollersInstance; + }); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(result).toEqual({ data: mockTriggerData }); + }); + + it('should return null data when trigger response is undefined in manual mode', async () => { + mockNodeType.trigger = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.webhook = undefined; + + const mockTriggersAndPollersInstance = { + runTrigger: jest.fn().mockResolvedValue(undefined), // Return undefined to trigger line 1277 + }; + const mockGlobalConfigInstance = { + sentry: { backendDsn: '' }, + }; + mockContainer.get.mockImplementation((token) => { + if (token === GlobalConfig) { + return mockGlobalConfigInstance; + } + if (token === TriggersAndPollers) { + return mockTriggersAndPollersInstance; + } + return mockTriggersAndPollersInstance; + }); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(result).toEqual({ data: null }); + }); + + it('should return null data and closeFunction when trigger response is empty in manual mode', async () => { + const mockCloseFunction = jest.fn(); + const mockTriggerResponse = { + manualTriggerResponse: Promise.resolve([]), // Empty response to trigger line 1301 + closeFunction: mockCloseFunction, + }; + + mockNodeType.trigger = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.webhook = undefined; + + const mockTriggersAndPollersInstance = { + runTrigger: jest.fn().mockResolvedValue(mockTriggerResponse), + }; + const mockGlobalConfigInstance = { + sentry: { backendDsn: '' }, + }; + mockContainer.get.mockImplementation((token) => { + if (token === GlobalConfig) { + return mockGlobalConfigInstance; + } + if (token === TriggersAndPollers) { + return mockTriggersAndPollersInstance; + } + return mockTriggersAndPollersInstance; + }); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(result).toEqual({ data: null, closeFunction: mockCloseFunction }); + }); + + it('should call manualTriggerFunction when defined in trigger response', async () => { + const mockTriggerData = [[{ json: { triggered: 'data' } }]]; + const mockManualTriggerFunction = jest.fn().mockResolvedValue(undefined); + const mockCloseFunction = jest.fn(); + const mockTriggerResponse = { + manualTriggerResponse: Promise.resolve(mockTriggerData), + manualTriggerFunction: mockManualTriggerFunction, // This will trigger line 1294 + closeFunction: mockCloseFunction, + }; + + mockNodeType.trigger = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.webhook = undefined; + + const mockTriggersAndPollersInstance = { + runTrigger: jest.fn().mockResolvedValue(mockTriggerResponse), + }; + const mockGlobalConfigInstance = { + sentry: { backendDsn: '' }, + }; + mockContainer.get.mockImplementation((token) => { + if (token === GlobalConfig) { + return mockGlobalConfigInstance; + } + if (token === TriggersAndPollers) { + return mockTriggersAndPollersInstance; + } + return mockTriggersAndPollersInstance; + }); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(mockManualTriggerFunction).toHaveBeenCalled(); // Verify line 1294 was executed + expect(result).toEqual({ data: mockTriggerData, closeFunction: mockCloseFunction }); + }); + + it('should pass through input data for trigger nodes in non-manual mode', async () => { + mockNodeType.trigger = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.webhook = undefined; + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'trigger', // Non-manual mode + ); + + expect(result).toEqual({ data: mockExecutionData.data.main }); + }); + }); + + describe('webhook node type handling', () => { + it('should pass through input data for non-declarative webhook nodes', async () => { + mockNodeType.webhook = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.trigger = undefined; + mockNodeType.description.requestDefaults = undefined; // Non-declarative + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(result).toEqual({ data: mockExecutionData.data.main }); + }); + + it('should execute declarative webhook nodes through routing node', async () => { + const mockData = [[{ json: { webhook: 'result' } }]]; + mockNodeType.webhook = jest.fn(); + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.trigger = undefined; + mockNodeType.description.requestDefaults = {}; // Declarative node + + const mockRoutingNodeInstance = { + runNode: jest.fn().mockResolvedValue(mockData), + }; + mockRoutingNode.mockImplementation(() => mockRoutingNodeInstance as unknown as RoutingNode); + + const mockContextInstance = {}; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockRoutingNode).toHaveBeenCalledWith(mockContextInstance, mockNodeType); + expect(mockRoutingNodeInstance.runNode).toHaveBeenCalled(); + expect(result).toEqual({ data: mockData }); + }); + }); + + describe('fallback routing node handling', () => { + it('should use routing node for nodes without specific execution methods', async () => { + const mockData = [[{ json: { routed: 'result' } }]]; + // Node with no execute, poll, trigger, or webhook methods + mockNodeType.execute = undefined; + mockNodeType.poll = undefined; + mockNodeType.trigger = undefined; + mockNodeType.webhook = undefined; + + const mockRoutingNodeInstance = { + runNode: jest.fn().mockResolvedValue(mockData), + }; + mockRoutingNode.mockImplementation(() => mockRoutingNodeInstance as unknown as RoutingNode); + + const mockContextInstance = {}; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + mockExecutionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + expect(mockRoutingNode).toHaveBeenCalledWith(mockContextInstance, mockNodeType); + expect(mockRoutingNodeInstance.runNode).toHaveBeenCalled(); + expect(result).toEqual({ data: mockData }); + }); + }); + + describe('executeOnce node handling', () => { + it('should slice input data to only first item when executeOnce is true', async () => { + const executeOnceNode = { ...mockNode, executeOnce: true }; + // Create input data with multiple connection types to trigger the slice logic in line 1183 + const inputData = { + main: [ + [{ json: { item: 1 } }, { json: { item: 2 } }, { json: { item: 3 } }], // This should be sliced to only first item + ], + ai_tool: [ + [{ json: { tool: 'a' } }, { json: { tool: 'b' } }], // This should also be sliced to only first item + ], + }; + const executionData = { + ...mockExecutionData, + node: executeOnceNode, + data: inputData, + }; + + let capturedInputData: ITaskDataConnections | undefined; + + mockNodeType.execute = jest.fn().mockResolvedValue([[{ json: { result: 'executeOnce' } }]]); + + const mockContextInstance = { hints: [] }; + mockExecuteContext.mockImplementation( + ( + _workflow, + _node, + _additionalData, + _mode, + _runExecutionData, + _runIndex, + _connectionInputData, + inputData, + _executionData, + _closeFunctions, + ) => { + // Capture the inputData that was passed to ExecuteContext + capturedInputData = inputData; + return mockContextInstance as unknown as ExecuteContext; + }, + ); + + await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + // Verify that the slice logic was applied correctly to ALL connection types + expect(capturedInputData).toBeDefined(); + if (capturedInputData) { + // Main connection should be sliced to only first item + expect(capturedInputData.main).toHaveLength(1); + expect(capturedInputData.main[0]).toHaveLength(1); + expect(capturedInputData.main[0]![0]).toEqual({ json: { item: 1 } }); + + // AI tool connection should also be sliced to only first item + expect(capturedInputData.ai_tool).toHaveLength(1); + expect(capturedInputData.ai_tool[0]).toHaveLength(1); + expect(capturedInputData.ai_tool[0]![0]).toEqual({ json: { tool: 'a' } }); + } + expect(mockNodeType.execute).toHaveBeenCalled(); + }); + }); + + describe('execution order and input data handling', () => { + it('should use first main input for v1 execution order when forceInputNodeExecution is false', async () => { + mockWorkflow.settings.executionOrder = 'v1'; // v1 means forceInputNodeExecution = false + const inputData = [ + [], // Empty first input + [{ json: { item: 2 } }], // Non-empty second input + [{ json: { item: 3 } }], + ]; + const executionData = { + ...mockExecutionData, + data: { main: inputData }, + }; + + mockNodeType.execute = jest.fn().mockResolvedValue([[{ json: { result: 'test' } }]]); + + const mockContextInstance = { hints: [] }; + mockExecuteContext.mockImplementation(() => mockContextInstance as unknown as ExecuteContext); + + const result = await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + // Should find the first non-empty input and execute + expect(result).toEqual({ data: [[{ json: { result: 'test' } }]], hints: [] }); + }); + + it('should use first main input for v0 execution order when forceInputNodeExecution is true', async () => { + mockWorkflow.settings.executionOrder = 'v0'; // v0 means forceInputNodeExecution = true + const inputData = [ + [], // Empty first input + [{ json: { item: 2 } }], // Non-empty second input + [{ json: { item: 3 } }], + ]; + const executionData = { + ...mockExecutionData, + data: { main: inputData }, + }; + + mockNodeType.execute = jest.fn().mockResolvedValue([[{ json: { result: 'test' } }]]); + + const result = await workflowExecute.runNode( + mockWorkflow, + executionData, + mockRunExecutionData, + 0, + mockAdditionalData, + 'manual', + ); + + // Should return undefined because first input is empty and we use first input in v0 + expect(result).toEqual({ data: undefined }); + }); + }); +}); diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index 6dd68bccb5..1039bd8b5c 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -1089,54 +1089,38 @@ export class WorkflowExecute { return customOperation; } - /** Executes the given node */ - // eslint-disable-next-line complexity - async runNode( - workflow: Workflow, - executionData: IExecuteData, - runExecutionData: IRunExecutionData, - runIndex: number, - additionalData: IWorkflowExecuteAdditionalData, - mode: WorkflowExecuteMode, - abortSignal?: AbortSignal, - ): Promise { - const { node } = executionData; - let inputData = executionData.data; - - if (node.disabled === true) { - // If node is disabled simply pass the data through - // return NodeRunHelpers. - if (inputData.hasOwnProperty('main') && inputData.main.length > 0) { - // If the node is disabled simply return the data from the first main input - if (inputData.main[0] === null) { - return { data: undefined }; - } - return { data: [inputData.main[0]] }; + /** + * Handles execution of disabled nodes by passing through input data + */ + private handleDisabledNode(inputData: ITaskDataConnections): IRunNodeResponse { + if (inputData.hasOwnProperty('main') && inputData.main.length > 0) { + // If the node is disabled simply return the data from the first main input + if (inputData.main[0] === null) { + return { data: undefined }; } - return { data: undefined }; + return { data: [inputData.main[0]] }; } + return { data: undefined }; + } - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - const isDeclarativeNode = nodeType.description.requestDefaults !== undefined; - - const customOperation = this.getCustomOperation(node, nodeType); - - let connectionInputData: INodeExecutionData[] = []; + private prepareConnectionInputData( + workflow: Workflow, + nodeType: INodeType, + customOperation: ReturnType, + inputData: ITaskDataConnections, + ): INodeExecutionData[] | null { if ( nodeType.execute || customOperation || (!nodeType.poll && !nodeType.trigger && !nodeType.webhook) ) { - // Only stop if first input is empty for execute runs. For all others run anyways - // because then it is a trigger node. As they only pass data through and so the input-data - // becomes output-data it has to be possible. - - if (inputData.main?.length > 0) { - // We always use the data of main input and the first input for execute - connectionInputData = inputData.main[0] as INodeExecutionData[]; + if (!inputData.main?.length) { + return null; } + // We always use the data of main input and the first input for execute + let connectionInputData = inputData.main[0] as INodeExecutionData[]; + const forceInputNodeExecution = workflow.settings.executionOrder !== 'v1'; if (!forceInputNodeExecution) { // If the nodes do not get force executed data of some inputs may be missing @@ -1150,11 +1134,20 @@ export class WorkflowExecute { } if (connectionInputData.length === 0) { - // No data for node so return - return { data: undefined }; + return null; } + + return connectionInputData; } + // For poll, trigger, and webhook nodes, we don't need to process input data + return []; + } + + /** + * Handles re-throwing errors from previous node execution attempts + */ + private rethrowLastNodeError(runExecutionData: IRunExecutionData, node: INode): void { if ( runExecutionData.resultData.lastNodeExecuted === node.name && runExecutionData.resultData.error !== undefined @@ -1173,7 +1166,12 @@ export class WorkflowExecute { error.stack = runExecutionData.resultData.error.stack; throw error; } + } + /** + * Handles executeOnce logic by limiting input data to first item only + */ + private handleExecuteOnce(node: INode, inputData: ITaskDataConnections): ITaskDataConnections { if (node.executeOnce === true) { // If node should be executed only once so use only the first input item const newInputData: ITaskDataConnections = {}; @@ -1182,14 +1180,257 @@ export class WorkflowExecute { return input && input.slice(0, 1); }); } - inputData = newInputData; + return newInputData; + } + return inputData; + } + + /** + * Validates execution data for JSON compatibility and reports issues to Sentry + */ + private reportJsonIncompatibleOutput( + data: INodeExecutionData[][] | null, + workflow: Workflow, + node: INode, + ): void { + if (Container.get(GlobalConfig).sentry.backendDsn) { + // If data is not json compatible then log it as incorrect output + // Does not block the execution from continuing + const jsonCompatibleResult = isJsonCompatible(data, new Set(['pairedItem'])); + if (!jsonCompatibleResult.isValid) { + Container.get(ErrorReporter).error('node execution returned incorrect data', { + shouldBeLogged: false, + extra: { + nodeName: node.name, + nodeType: node.type, + nodeVersion: node.typeVersion, + workflowId: workflow.id, + workflowName: workflow.name ?? 'Unnamed workflow', + executionId: this.additionalData.executionId ?? 'unsaved-execution', + errorPath: jsonCompatibleResult.errorPath, + errorMessage: jsonCompatibleResult.errorMessage, + }, + }); + } + } + } + + private async executeNode( + workflow: Workflow, + node: INode, + nodeType: INodeType, + customOperation: ReturnType, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + runExecutionData: IRunExecutionData, + runIndex: number, + connectionInputData: INodeExecutionData[], + inputData: ITaskDataConnections, + executionData: IExecuteData, + abortSignal?: AbortSignal, + ): Promise { + const closeFunctions: CloseFunction[] = []; + const context = new ExecuteContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executionData, + closeFunctions, + abortSignal, + ); + + let data: INodeExecutionData[][] | null = null; + + if (customOperation) { + data = await customOperation.call(context); + } else if (nodeType.execute) { + data = + nodeType instanceof Node + ? await nodeType.execute(context) + : await nodeType.execute.call(context); } - if (nodeType.execute || customOperation) { - const closeFunctions: CloseFunction[] = []; - const context = new ExecuteContext( + this.reportJsonIncompatibleOutput(data, workflow, node); + + const closeFunctionsResults = await Promise.allSettled( + closeFunctions.map(async (fn) => await fn()), + ); + + const closingErrors = closeFunctionsResults + .filter((result): result is PromiseRejectedResult => result.status === 'rejected') + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + .map((result) => result.reason); + + if (closingErrors.length > 0) { + if (closingErrors[0] instanceof Error) throw closingErrors[0]; + throw new ApplicationError("Error on execution node's close function(s)", { + extra: { nodeName: node.name }, + tags: { nodeType: node.type }, + cause: closingErrors, + }); + } + + return { data, hints: context.hints }; + } + + /** + * Executes a poll node + */ + private async executePollNode( + workflow: Workflow, + node: INode, + nodeType: INodeType, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + inputData: ITaskDataConnections, + ): Promise { + if (mode === 'manual') { + // In manual mode run the poll function + const context = new PollContext(workflow, node, additionalData, mode, 'manual'); + return { data: await nodeType.poll!.call(context) }; + } + // In any other mode pass data through as it already contains the result of the poll + return { data: inputData.main as INodeExecutionData[][] }; + } + + /** + * Executes a trigger node + */ + private async executeTriggerNode( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + inputData: ITaskDataConnections, + abortSignal?: AbortSignal, + ): Promise { + if (mode === 'manual') { + // In manual mode start the trigger + const triggerResponse = await Container.get(TriggersAndPollers).runTrigger( workflow, node, + NodeExecuteFunctions.getExecuteTriggerFunctions, + additionalData, + mode, + 'manual', + ); + + if (triggerResponse === undefined) { + return { data: null }; + } + + let closeFunction; + if (triggerResponse.closeFunction) { + // In manual mode we return the trigger closeFunction. That allows it to be called directly + // but we do not have to wait for it to finish. That is important for things like queue-nodes. + // There the full close will may be delayed till a message gets acknowledged after the execution. + // If we would not be able to wait for it to close would it cause problems with "own" mode as the + // process would be killed directly after it and so the acknowledge would not have been finished yet. + closeFunction = triggerResponse.closeFunction; + + // Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers + abortSignal?.addEventListener('abort', closeFunction); + } + + if (triggerResponse.manualTriggerFunction !== undefined) { + // If a manual trigger function is defined call it and wait till it did run + await triggerResponse.manualTriggerFunction(); + } + + const response = await triggerResponse.manualTriggerResponse!; + + if (response.length === 0) { + return { data: null, closeFunction }; + } + + return { data: response, closeFunction }; + } + // For trigger nodes in any mode except "manual" do we simply pass the data through + return { data: inputData.main as INodeExecutionData[][] }; + } + + private async executeDeclarativeNodeInTest( + workflow: Workflow, + node: INode, + nodeType: INodeType, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + runExecutionData: IRunExecutionData, + runIndex: number, + connectionInputData: INodeExecutionData[], + inputData: ITaskDataConnections, + executionData: IExecuteData, + ): Promise { + // NOTE: This block is only called by nodes tests. + // In the application, declarative nodes get assigned a `.execute` method in NodeTypes. + const context = new ExecuteContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executionData, + [], + ); + const routingNode = new RoutingNode(context, nodeType); + const data = await routingNode.runNode(); + return { data }; + } + + /** + * Figures out the node type and state and calls the right execution + * implementation. + */ + async runNode( + workflow: Workflow, + executionData: IExecuteData, + runExecutionData: IRunExecutionData, + runIndex: number, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + abortSignal?: AbortSignal, + ): Promise { + const { node } = executionData; + let inputData = executionData.data; + + if (node.disabled === true) { + return this.handleDisabledNode(inputData); + } + + const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + const customOperation = this.getCustomOperation(node, nodeType); + + const connectionInputData = this.prepareConnectionInputData( + workflow, + nodeType, + customOperation, + inputData, + ); + + if (connectionInputData === null) { + return { data: undefined }; + } + + this.rethrowLastNodeError(runExecutionData, node); + + inputData = this.handleExecuteOnce(node, inputData); + + const isDeclarativeNode = nodeType.description.requestDefaults !== undefined; + + if (nodeType.execute || customOperation) { + return await this.executeNode( + workflow, + node, + nodeType, + customOperation, additionalData, mode, runExecutionData, @@ -1197,137 +1438,44 @@ export class WorkflowExecute { connectionInputData, inputData, executionData, - closeFunctions, abortSignal, ); + } - let data; + if (nodeType.poll) { + return await this.executePollNode(workflow, node, nodeType, additionalData, mode, inputData); + } - if (customOperation) { - data = await customOperation.call(context); - } else if (nodeType.execute) { - data = - nodeType instanceof Node - ? await nodeType.execute(context) - : await nodeType.execute.call(context); - } - - if (Container.get(GlobalConfig).sentry.backendDsn) { - // If data is not json compatible then log it as incorrect output - // Does not block the execution from continuing - const jsonCompatibleResult = isJsonCompatible(data, new Set(['pairedItem'])); - if (!jsonCompatibleResult.isValid) { - Container.get(ErrorReporter).error('node execution returned incorrect data', { - shouldBeLogged: false, - extra: { - nodeName: node.name, - nodeType: node.type, - nodeVersion: node.typeVersion, - workflowId: workflow.id, - workflowName: workflow.name ?? 'Unnamed workflow', - executionId: this.additionalData.executionId ?? 'unsaved-execution', - errorPath: jsonCompatibleResult.errorPath, - errorMessage: jsonCompatibleResult.errorMessage, - }, - }); - } - } - - const closeFunctionsResults = await Promise.allSettled( - closeFunctions.map(async (fn) => await fn()), + if (nodeType.trigger) { + return await this.executeTriggerNode( + workflow, + node, + additionalData, + mode, + inputData, + abortSignal, ); + } - const closingErrors = closeFunctionsResults - .filter((result): result is PromiseRejectedResult => result.status === 'rejected') - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - .map((result) => result.reason); - - if (closingErrors.length > 0) { - if (closingErrors[0] instanceof Error) throw closingErrors[0]; - throw new ApplicationError("Error on execution node's close function(s)", { - extra: { nodeName: node.name }, - tags: { nodeType: node.type }, - cause: closingErrors, - }); - } - - return { data, hints: context.hints }; - } else if (nodeType.poll) { - if (mode === 'manual') { - // In manual mode run the poll function - const context = new PollContext(workflow, node, additionalData, mode, 'manual'); - return { data: await nodeType.poll.call(context) }; - } - // In any other mode pass data through as it already contains the result of the poll - return { data: inputData.main as INodeExecutionData[][] }; - } else if (nodeType.trigger) { - if (mode === 'manual') { - // In manual mode start the trigger - const triggerResponse = await Container.get(TriggersAndPollers).runTrigger( - workflow, - node, - NodeExecuteFunctions.getExecuteTriggerFunctions, - additionalData, - mode, - 'manual', - ); - - if (triggerResponse === undefined) { - return { data: null }; - } - - let closeFunction; - if (triggerResponse.closeFunction) { - // In manual mode we return the trigger closeFunction. That allows it to be called directly - // but we do not have to wait for it to finish. That is important for things like queue-nodes. - // There the full close will may be delayed till a message gets acknowledged after the execution. - // If we would not be able to wait for it to close would it cause problems with "own" mode as the - // process would be killed directly after it and so the acknowledge would not have been finished yet. - closeFunction = triggerResponse.closeFunction; - - // Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers - abortSignal?.addEventListener('abort', closeFunction); - } - - if (triggerResponse.manualTriggerFunction !== undefined) { - // If a manual trigger function is defined call it and wait till it did run - await triggerResponse.manualTriggerFunction(); - } - - const response = await triggerResponse.manualTriggerResponse!; - - if (response.length === 0) { - return { data: null, closeFunction }; - } - - return { data: response, closeFunction }; - } - // For trigger nodes in any mode except "manual" do we simply pass the data through - return { data: inputData.main as INodeExecutionData[][] }; - } else if (nodeType.webhook && !isDeclarativeNode) { + if (nodeType.webhook && !isDeclarativeNode) { // Check if the node have requestDefaults(Declarative Node), // else for webhook nodes always simply pass the data through // as webhook method would be called by WebhookService return { data: inputData.main as INodeExecutionData[][] }; - } else { - // NOTE: This block is only called by nodes tests. - // In the application, declarative nodes get assigned a `.execute` method in NodeTypes. - const context = new ExecuteContext( - workflow, - node, - additionalData, - mode, - runExecutionData, - runIndex, - connectionInputData, - inputData, - executionData, - [], - ); - const routingNode = new RoutingNode(context, nodeType); - const data = await routingNode.runNode(); - return { data }; } + + return await this.executeDeclarativeNodeInTest( + workflow, + node, + nodeType, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executionData, + ); } /**