import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { DirectedGraph, WorkflowExecute } from 'n8n-core'; import * as core from 'n8n-core'; import type { IExecuteData, INode, IRun, ITaskData, IWaitingForExecution, IWaitingForExecutionSource, IWorkflowBase, IWorkflowExecutionDataProcess, StartNodeData, } from 'n8n-workflow'; import { Workflow, type ExecutionError } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import type { ExecutionEntity } from '@/databases/entities/execution-entity'; import type { User } from '@/databases/entities/user'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { Telemetry } from '@/telemetry'; import { PermissionChecker } from '@/user-management/permission-checker'; import { WorkflowRunner } from '@/workflow-runner'; import { mockInstance } from '@test/mocking'; import { createExecution } from '@test-integration/db/executions'; import { createUser } from '@test-integration/db/users'; import { createWorkflow } from '@test-integration/db/workflows'; import * as testDb from '@test-integration/test-db'; import { setupTestServer } from '@test-integration/utils'; let owner: User; let runner: WorkflowRunner; setupTestServer({ endpointGroups: [] }); mockInstance(Telemetry); beforeAll(async () => { owner = await createUser({ role: 'global:owner' }); runner = Container.get(WorkflowRunner); }); afterAll(() => { jest.restoreAllMocks(); }); beforeEach(async () => { await testDb.truncate(['Workflow', 'SharedWorkflow']); jest.clearAllMocks(); }); describe('processError', () => { let workflow: IWorkflowBase; let execution: ExecutionEntity; let hooks: core.ExecutionLifecycleHooks; const watcher = mock<{ workflowExecuteAfter: () => Promise }>(); beforeEach(async () => { jest.clearAllMocks(); workflow = await createWorkflow({}, owner); execution = await createExecution({ status: 'success', finished: true }, workflow); hooks = new core.ExecutionLifecycleHooks('webhook', execution.id, workflow); hooks.addHandler('workflowExecuteAfter', watcher.workflowExecuteAfter); }); test('processError should return early in Bull stalled edge case', async () => { const workflow = await createWorkflow({}, owner); const execution = await createExecution( { status: 'success', finished: true, }, workflow, ); config.set('executions.mode', 'queue'); await runner.processError( new Error('test') as ExecutionError, new Date(), 'webhook', execution.id, hooks, ); expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0); }); test('processError should return early if the error is `ExecutionNotFoundError`', async () => { const workflow = await createWorkflow({}, owner); const execution = await createExecution({ status: 'success', finished: true }, workflow); await runner.processError( new ExecutionNotFoundError(execution.id), new Date(), 'webhook', execution.id, hooks, ); expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0); }); test('processError should process error', async () => { const workflow = await createWorkflow({}, owner); const execution = await createExecution( { status: 'success', finished: true, }, workflow, ); await Container.get(ActiveExecutions).add( { executionMode: 'webhook', workflowData: workflow }, execution.id, ); config.set('executions.mode', 'regular'); await runner.processError( new Error('test') as ExecutionError, new Date(), 'webhook', execution.id, hooks, ); expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(1); }); }); describe('run', () => { it('uses recreateNodeExecutionStack to create a partial execution if a triggerToStartFrom with data is sent', async () => { // ARRANGE const activeExecutions = Container.get(ActiveExecutions); jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce(); const permissionChecker = Container.get(PermissionChecker); jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce(); jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce( new PCancelable(() => { return mock(); }), ); jest.spyOn(Workflow.prototype, 'getNode').mockReturnValueOnce(mock()); jest.spyOn(DirectedGraph, 'fromWorkflow').mockReturnValueOnce(new DirectedGraph()); const recreateNodeExecutionStackSpy = jest .spyOn(core, 'recreateNodeExecutionStack') .mockReturnValueOnce({ nodeExecutionStack: mock(), waitingExecution: mock(), waitingExecutionSource: mock(), }); const data = mock({ triggerToStartFrom: { name: 'trigger', data: mock() }, workflowData: { nodes: [] }, executionData: undefined, startNodes: [mock()], destinationNode: undefined, }); // ACT await runner.run(data); // ASSERT expect(recreateNodeExecutionStackSpy).toHaveBeenCalled(); }); it('does not use recreateNodeExecutionStack to create a partial execution if a triggerToStartFrom without data is sent', async () => { // ARRANGE const activeExecutions = Container.get(ActiveExecutions); jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce(); const permissionChecker = Container.get(PermissionChecker); jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce(); jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce( new PCancelable(() => { return mock(); }), ); const recreateNodeExecutionStackSpy = jest.spyOn(core, 'recreateNodeExecutionStack'); const data = mock({ triggerToStartFrom: { name: 'trigger', data: undefined }, workflowData: { nodes: [] }, executionData: undefined, startNodes: [mock()], destinationNode: undefined, }); // ACT await runner.run(data); // ASSERT expect(recreateNodeExecutionStackSpy).not.toHaveBeenCalled(); }); });