diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index 4b0154e11c..b06698875c 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -1,3 +1,4 @@ +import { GlobalConfig } from '@n8n/config'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import type { IWorkflowBase } from 'n8n-workflow'; @@ -23,9 +24,11 @@ import { } from '@/executions/pre-execution-checks'; import { ExternalHooks } from '@/external-hooks'; import { SecretsHelper } from '@/secrets-helpers.ee'; +import { UrlService } from '@/services/url.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { Telemetry } from '@/telemetry'; import { executeWorkflow, getBase, getRunData } from '@/workflow-execute-additional-data'; +import * as WorkflowHelpers from '@/workflow-helpers'; import { mockInstance } from '@test/mocking'; const EXECUTION_ID = '123'; @@ -97,6 +100,9 @@ describe('WorkflowExecuteAdditionalData', () => { mockInstance(SubworkflowPolicyChecker); mockInstance(WorkflowStatisticsService); + const urlService = mockInstance(UrlService); + Container.set(UrlService, urlService); + test('logAiEvent should call MessageEventBus', async () => { const additionalData = await getBase('user-id'); @@ -264,4 +270,67 @@ describe('WorkflowExecuteAdditionalData', () => { }); }); }); + + describe('getBase', () => { + const mockWebhookBaseUrl = 'webhook-base-url.com'; + jest.spyOn(urlService, 'getWebhookBaseUrl').mockReturnValue(mockWebhookBaseUrl); + + const globalConfig = mockInstance(GlobalConfig); + Container.set(GlobalConfig, globalConfig); + globalConfig.endpoints = mock({ + rest: '/rest/', + formWaiting: '/form-waiting/', + webhook: '/webhook/', + webhookWaiting: '/webhook-waiting/', + webhookTest: '/webhook-test/', + }); + + const mockVariables = { variable: 1 }; + jest.spyOn(WorkflowHelpers, 'getVariables').mockResolvedValue(mockVariables); + + it('should return base additional data with default values', async () => { + const additionalData = await getBase(); + + expect(additionalData).toMatchObject({ + currentNodeExecutionIndex: 0, + credentialsHelper, + executeWorkflow: expect.any(Function), + restApiUrl: `${mockWebhookBaseUrl}/rest/`, + instanceBaseUrl: mockWebhookBaseUrl, + formWaitingBaseUrl: `${mockWebhookBaseUrl}/form-waiting/`, + webhookBaseUrl: `${mockWebhookBaseUrl}/webhook/`, + webhookWaitingBaseUrl: `${mockWebhookBaseUrl}/webhook-waiting/`, + webhookTestBaseUrl: `${mockWebhookBaseUrl}/webhook-test/`, + currentNodeParameters: undefined, + executionTimeoutTimestamp: undefined, + userId: undefined, + setExecutionStatus: expect.any(Function), + variables: mockVariables, + secretsHelpers: secretsHelper, + startRunnerTask: expect.any(Function), + logAiEvent: expect.any(Function), + }); + }); + + it('should include userId when provided', async () => { + const userId = 'test-user-id'; + const additionalData = await getBase(userId); + + expect(additionalData.userId).toBe(userId); + }); + + it('should include currentNodeParameters when provided', async () => { + const currentNodeParameters = { param1: 'value1' }; + const additionalData = await getBase(undefined, currentNodeParameters); + + expect(additionalData.currentNodeParameters).toBe(currentNodeParameters); + }); + + it('should include executionTimeoutTimestamp when provided', async () => { + const executionTimeoutTimestamp = Date.now() + 1000; + const additionalData = await getBase(undefined, undefined, executionTimeoutTimestamp); + + expect(additionalData.executionTimeoutTimestamp).toBe(executionTimeoutTimestamp); + }); + }); }); diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 8a66611499..5473b717fc 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -12,6 +12,7 @@ import type { IWorkflowBase, IWorkflowExecutionDataProcess, StartNodeData, + IWorkflowExecuteAdditionalData, } from 'n8n-workflow'; import { Workflow, type ExecutionError } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; @@ -22,7 +23,9 @@ import type { ExecutionEntity } from '@/databases/entities/execution-entity'; import type { User } from '@/databases/entities/user'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { CredentialsPermissionChecker } from '@/executions/pre-execution-checks'; +import { ManualExecutionService } from '@/manual-execution.service'; import { Telemetry } from '@/telemetry'; +import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import { WorkflowRunner } from '@/workflow-runner'; import { mockInstance } from '@test/mocking'; import { createExecution } from '@test-integration/db/executions'; @@ -197,4 +200,61 @@ describe('run', () => { // ASSERT expect(recreateNodeExecutionStackSpy).not.toHaveBeenCalled(); }); + + it('run partial execution with additional data', async () => { + // ARRANGE + const activeExecutions = Container.get(ActiveExecutions); + jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); + jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce(); + const permissionChecker = Container.get(CredentialsPermissionChecker); + jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce(); + jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce( + new PCancelable(() => { + return mock(); + }), + ); + + jest.spyOn(Workflow.prototype, 'getNode').mockReturnValueOnce(mock()); + jest.spyOn(DirectedGraph, 'fromWorkflow').mockReturnValueOnce(new DirectedGraph()); + + const additionalData = mock(); + jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(additionalData); + jest.spyOn(ManualExecutionService.prototype, 'runManually'); + jest.spyOn(core, 'recreateNodeExecutionStack').mockReturnValueOnce({ + nodeExecutionStack: mock(), + waitingExecution: mock(), + waitingExecutionSource: mock(), + }); + + const data = mock({ + triggerToStartFrom: { name: 'trigger', data: mock() }, + + workflowData: { nodes: [] }, + executionData: undefined, + startNodes: [mock()], + destinationNode: undefined, + runData: { + trigger: [mock({ executionIndex: 7 })], + otherNode: [mock({ executionIndex: 8 }), mock({ executionIndex: 9 })], + }, + userId: 'mock-user-id', + }); + + // ACT + await runner.run(data); + + // ASSERT + expect(WorkflowExecuteAdditionalData.getBase).toHaveBeenCalledWith( + data.userId, + undefined, + undefined, + ); + expect(ManualExecutionService.prototype.runManually).toHaveBeenCalledWith( + data, + expect.any(Workflow), + additionalData, + '1', + undefined, + ); + }); }); diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts index 07626dea4c..f88bb415be 100644 --- a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -1,7 +1,8 @@ import { mock } from 'jest-mock-extended'; import type { Logger } from 'n8n-core'; import { mockInstance } from 'n8n-core/test/utils'; -import type { IRunExecutionData, WorkflowExecuteMode } from 'n8n-workflow/src'; +import type { IPinData, ITaskData, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; +import { Workflow, type IRunExecutionData, type WorkflowExecuteMode } from 'n8n-workflow'; import { CredentialsHelper } from '@/credentials-helper'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; @@ -11,6 +12,7 @@ import type { ManualExecutionService } from '@/manual-execution.service'; import { SecretsHelper } from '@/secrets-helpers.ee'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import type { IExecutionResponse } from '@/types-db'; +import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { JobProcessor } from '../job-processor'; @@ -80,4 +82,57 @@ describe('JobProcessor', () => { expect(manualExecutionService.runManually).toHaveBeenCalledTimes(1); }, ); + + it('should pass additional data for partial executions to run', async () => { + const executionRepository = mock(); + const pinData: IPinData = { pinned: [] }; + const execution = mock({ + mode: 'manual', + workflowData: { nodes: [], pinData }, + data: mock({ + isTestWebhook: false, + resultData: { + runData: { + trigger: [mock({ executionIndex: 1 })], + node: [mock({ executionIndex: 3 }), mock({ executionIndex: 4 })], + }, + pinData, + }, + }), + }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + + const additionalData = mock(); + jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(additionalData); + + const manualExecutionService = mock(); + const jobProcessor = new JobProcessor( + logger, + mock(), + executionRepository, + mock(), + mock(), + mock(), + manualExecutionService, + ); + + const executionId = 'execution-id'; + await jobProcessor.processJob(mock({ data: { executionId, loadStaticData: false } })); + + expect(WorkflowExecuteAdditionalData.getBase).toHaveBeenCalledWith( + undefined, + undefined, + undefined, + ); + + expect(manualExecutionService.runManually).toHaveBeenCalledWith( + expect.objectContaining({ + executionMode: 'manual', + }), + expect.any(Workflow), + additionalData, + executionId, + pinData, + ); + }); }); diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 92cc484c3c..6fc680b20c 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -235,6 +235,7 @@ export class WorkflowRunner { settings: workflowSettings, pinData, }); + const additionalData = await WorkflowExecuteAdditionalData.getBase( data.userId, undefined, diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts index b9999f02b7..7cd29a256e 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts @@ -843,6 +843,106 @@ describe('WorkflowExecute', () => { expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1); expect(processRunExecutionDataSpy).toHaveBeenCalledWith(expectedGraph); }); + + // ►► + // ┌───────┐1 ┌─────┐1 ┌─────┐ + // │trigger├──────►node1├──────►node2│ + // └───────┘ └─────┘ └─────┘ + test('increments partial execution index starting with max index of previous runs', async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise); + additionalData.hooks = mock(); + jest.spyOn(additionalData.hooks, 'runHook'); + + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const workflow = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 }) + .toWorkflow({ name: '', active: false, nodeTypes }); + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { name: trigger.name } }], { executionIndex: 0 })], + [node1.name]: [ + toITaskData([{ data: { name: node1.name } }], { executionIndex: 3 }), + toITaskData([{ data: { name: node1.name } }], { executionIndex: 4 }), + ], + [node2.name]: [toITaskData([{ data: { name: node2.name } }], { executionIndex: 2 })], + }; + const dirtyNodeNames: string[] = []; + const destinationNode = node2.name; + + const processRunExecutionDataSpy = jest.spyOn(workflowExecute, 'processRunExecutionData'); + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + destinationNode, + ); + + // ASSERT + expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1); + expect(additionalData.hooks?.runHook).toHaveBeenCalledWith('nodeExecuteBefore', [ + node2.name, + expect.objectContaining({ executionIndex: 5 }), + ]); + }); + + // ►► + // ┌───────┐1 ┌─────┐1 + // │trigger├──────►node1| + // └───────┘ └─────┘ + test('increments partial execution index starting with max index of 0 of previous runs', async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise); + additionalData.hooks = mock(); + jest.spyOn(additionalData.hooks, 'runHook'); + + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const node1 = createNodeData({ name: 'node1' }); + const workflow = new DirectedGraph() + .addNodes(trigger, node1) + .addConnections({ from: trigger, to: node1 }) + .toWorkflow({ name: '', active: false, nodeTypes }); + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { name: trigger.name } }], { executionIndex: 0 })], + [node1.name]: [ + toITaskData([{ data: { name: node1.name } }], { executionIndex: 3 }), + toITaskData([{ data: { name: node1.name } }], { executionIndex: 4 }), + ], + }; + const dirtyNodeNames: string[] = []; + const destinationNode = node1.name; + + const processRunExecutionDataSpy = jest.spyOn(workflowExecute, 'processRunExecutionData'); + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + destinationNode, + ); + + // ASSERT + expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1); + expect(additionalData.hooks?.runHook).toHaveBeenCalledWith('nodeExecuteBefore', [ + node1.name, + expect.objectContaining({ executionIndex: 1 }), + ]); + }); }); describe('checkReadyForExecution', () => { diff --git a/packages/core/src/execution-engine/partial-execution-utils/__tests__/helpers.ts b/packages/core/src/execution-engine/partial-execution-utils/__tests__/helpers.ts index 5fffd28346..d88fcbe1c7 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/__tests__/helpers.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/__tests__/helpers.ts @@ -33,7 +33,7 @@ type TaskData = { nodeConnectionType?: NodeConnectionType; }; -export function toITaskData(taskData: TaskData[]): ITaskData { +export function toITaskData(taskData: TaskData[], overrides?: Partial): ITaskData { const result: ITaskData = { executionStatus: 'success', executionTime: 0, @@ -41,6 +41,7 @@ export function toITaskData(taskData: TaskData[]): ITaskData { executionIndex: 0, source: [], data: {}, + ...(overrides ?? {}), }; // NOTE: Here to make TS happy. diff --git a/packages/core/src/execution-engine/partial-execution-utils/__tests__/run-data-utils.test.ts b/packages/core/src/execution-engine/partial-execution-utils/__tests__/run-data-utils.test.ts new file mode 100644 index 0000000000..b95b58ff86 --- /dev/null +++ b/packages/core/src/execution-engine/partial-execution-utils/__tests__/run-data-utils.test.ts @@ -0,0 +1,61 @@ +import { mock } from 'jest-mock-extended'; +import type { IRunData } from 'n8n-workflow'; + +import { getNextExecutionIndex } from '../run-data-utils'; + +describe('getNextExecutionIndex', () => { + it('should return 0 if runData is undefined', () => { + const result = getNextExecutionIndex(undefined); + expect(result).toBe(0); + }); + + it('should return 0 if runData is empty', () => { + const result = getNextExecutionIndex({}); + expect(result).toBe(0); + }); + + it('should return the next execution index based on the highest executionIndex in runData', () => { + const runData = mock({ + node1: [{ executionIndex: 0 }, { executionIndex: 1 }], + node2: [{ executionIndex: 2 }], + }); + const result = getNextExecutionIndex(runData); + expect(result).toBe(3); + }); + + it('should return 1 if all tasks in runData have executionIndex 0', () => { + const runData = mock({ + node1: [{ executionIndex: 0 }, { executionIndex: 0 }], + node2: [{ executionIndex: 0 }], + }); + const result = getNextExecutionIndex(runData); + expect(result).toBe(1); + }); + + it('should handle runData with mixed executionIndex values', () => { + const runData = mock({ + node1: [{ executionIndex: 5 }, { executionIndex: 3 }], + node2: [{ executionIndex: 7 }, { executionIndex: 2 }], + }); + const result = getNextExecutionIndex(runData); + expect(result).toBe(8); + }); + + it('should handle runData with missing executionIndex values', () => { + const runData = mock({ + node1: [{}], + node2: [{}, {}], + }); + const result = getNextExecutionIndex(runData); + expect(result).toBe(0); + }); + + it('should handle runData with negative executionIndex values', () => { + const runData = mock({ + node1: [{ executionIndex: -5 }, { executionIndex: -10 }], + node2: [{ executionIndex: -2 }], + }); + const result = getNextExecutionIndex(runData); + expect(result).toBe(-1); + }); +}); diff --git a/packages/core/src/execution-engine/partial-execution-utils/index.ts b/packages/core/src/execution-engine/partial-execution-utils/index.ts index f7eb5c6eca..5400ccbf3b 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/index.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/index.ts @@ -8,3 +8,4 @@ export { handleCycles } from './handle-cycles'; export { filterDisabledNodes } from './filter-disabled-nodes'; export { isTool } from './is-tool'; export { rewireGraph } from './rewire-graph'; +export { getNextExecutionIndex } from './run-data-utils'; diff --git a/packages/core/src/execution-engine/partial-execution-utils/run-data-utils.ts b/packages/core/src/execution-engine/partial-execution-utils/run-data-utils.ts new file mode 100644 index 0000000000..df9b4f8ad1 --- /dev/null +++ b/packages/core/src/execution-engine/partial-execution-utils/run-data-utils.ts @@ -0,0 +1,26 @@ +import type { IRunData } from 'n8n-workflow'; + +/** + * Calculates the next execution index by finding the highest existing index in the run data and incrementing by 1. + * + * The execution index is used to track the sequence of workflow executions. + * + * @param {IRunData} [runData={}] + * @returns {number} The next execution index (previous highest index + 1, or 0 if no previous executionIndex exist). + */ +export function getNextExecutionIndex(runData: IRunData = {}): number { + // If runData is empty, return 0 as the first execution index + if (!runData || Object.keys(runData).length === 0) return 0; + + const previousIndices = Object.values(runData) + .flat() + .map((taskData) => taskData.executionIndex) + // filter out undefined if previous execution does not have index + // this can happen if rerunning execution before executionIndex was introduced + .filter((value) => typeof value === 'number'); + + // If no valid indices were found, return 0 as the first execution index + if (previousIndices.length === 0) return 0; + + return Math.max(...previousIndices) + 1; +} diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index fed897536c..254e202a33 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -70,6 +70,7 @@ import { filterDisabledNodes, rewireGraph, isTool, + getNextExecutionIndex, } from './partial-execution-utils'; import { RoutingNode } from './routing-node'; import { TriggersAndPollers } from './triggers-and-pollers'; @@ -194,6 +195,9 @@ export class WorkflowExecute { let incomingNodeConnections: INodeConnections | undefined; let connection: IConnection; + // Increment currentExecutionIndex based on previous run + this.additionalData.currentNodeExecutionIndex = getNextExecutionIndex(runData); + this.status = 'running'; const runIndex = 0; @@ -428,6 +432,10 @@ export class WorkflowExecute { recreateNodeExecutionStack(graph, startNodes, runData, pinData ?? {}); // 8. Execute + + // Increment currentExecutionIndex based on previous run + this.additionalData.currentNodeExecutionIndex = getNextExecutionIndex(runData); + this.status = 'running'; this.runExecutionData = { startData: {