mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-22 12:19:09 +00:00
fix(core): Increment executionIndex in partial executions (no-changelog) (#14946)
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
@@ -843,6 +843,106 @@ describe('WorkflowExecute', () => {
|
||||
expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1);
|
||||
expect(processRunExecutionDataSpy).toHaveBeenCalledWith(expectedGraph);
|
||||
});
|
||||
|
||||
// ►►
|
||||
// ┌───────┐1 ┌─────┐1 ┌─────┐
|
||||
// │trigger├──────►node1├──────►node2│
|
||||
// └───────┘ └─────┘ └─────┘
|
||||
test('increments partial execution index starting with max index of previous runs', async () => {
|
||||
// ARRANGE
|
||||
const waitPromise = createDeferredPromise<IRun>();
|
||||
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
|
||||
additionalData.hooks = mock<ExecutionLifecycleHooks>();
|
||||
jest.spyOn(additionalData.hooks, 'runHook');
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
|
||||
|
||||
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
|
||||
const node1 = createNodeData({ name: 'node1' });
|
||||
const node2 = createNodeData({ name: 'node2' });
|
||||
const workflow = new DirectedGraph()
|
||||
.addNodes(trigger, node1, node2)
|
||||
.addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 })
|
||||
.toWorkflow({ name: '', active: false, nodeTypes });
|
||||
const pinData: IPinData = {};
|
||||
const runData: IRunData = {
|
||||
[trigger.name]: [toITaskData([{ data: { name: trigger.name } }], { executionIndex: 0 })],
|
||||
[node1.name]: [
|
||||
toITaskData([{ data: { name: node1.name } }], { executionIndex: 3 }),
|
||||
toITaskData([{ data: { name: node1.name } }], { executionIndex: 4 }),
|
||||
],
|
||||
[node2.name]: [toITaskData([{ data: { name: node2.name } }], { executionIndex: 2 })],
|
||||
};
|
||||
const dirtyNodeNames: string[] = [];
|
||||
const destinationNode = node2.name;
|
||||
|
||||
const processRunExecutionDataSpy = jest.spyOn(workflowExecute, 'processRunExecutionData');
|
||||
|
||||
// ACT
|
||||
await workflowExecute.runPartialWorkflow2(
|
||||
workflow,
|
||||
runData,
|
||||
pinData,
|
||||
dirtyNodeNames,
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// ASSERT
|
||||
expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1);
|
||||
expect(additionalData.hooks?.runHook).toHaveBeenCalledWith('nodeExecuteBefore', [
|
||||
node2.name,
|
||||
expect.objectContaining({ executionIndex: 5 }),
|
||||
]);
|
||||
});
|
||||
|
||||
// ►►
|
||||
// ┌───────┐1 ┌─────┐1
|
||||
// │trigger├──────►node1|
|
||||
// └───────┘ └─────┘
|
||||
test('increments partial execution index starting with max index of 0 of previous runs', async () => {
|
||||
// ARRANGE
|
||||
const waitPromise = createDeferredPromise<IRun>();
|
||||
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
|
||||
additionalData.hooks = mock<ExecutionLifecycleHooks>();
|
||||
jest.spyOn(additionalData.hooks, 'runHook');
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
|
||||
|
||||
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
|
||||
const node1 = createNodeData({ name: 'node1' });
|
||||
const workflow = new DirectedGraph()
|
||||
.addNodes(trigger, node1)
|
||||
.addConnections({ from: trigger, to: node1 })
|
||||
.toWorkflow({ name: '', active: false, nodeTypes });
|
||||
const pinData: IPinData = {};
|
||||
const runData: IRunData = {
|
||||
[trigger.name]: [toITaskData([{ data: { name: trigger.name } }], { executionIndex: 0 })],
|
||||
[node1.name]: [
|
||||
toITaskData([{ data: { name: node1.name } }], { executionIndex: 3 }),
|
||||
toITaskData([{ data: { name: node1.name } }], { executionIndex: 4 }),
|
||||
],
|
||||
};
|
||||
const dirtyNodeNames: string[] = [];
|
||||
const destinationNode = node1.name;
|
||||
|
||||
const processRunExecutionDataSpy = jest.spyOn(workflowExecute, 'processRunExecutionData');
|
||||
|
||||
// ACT
|
||||
await workflowExecute.runPartialWorkflow2(
|
||||
workflow,
|
||||
runData,
|
||||
pinData,
|
||||
dirtyNodeNames,
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// ASSERT
|
||||
expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1);
|
||||
expect(additionalData.hooks?.runHook).toHaveBeenCalledWith('nodeExecuteBefore', [
|
||||
node1.name,
|
||||
expect.objectContaining({ executionIndex: 1 }),
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('checkReadyForExecution', () => {
|
||||
|
||||
@@ -33,7 +33,7 @@ type TaskData = {
|
||||
nodeConnectionType?: NodeConnectionType;
|
||||
};
|
||||
|
||||
export function toITaskData(taskData: TaskData[]): ITaskData {
|
||||
export function toITaskData(taskData: TaskData[], overrides?: Partial<ITaskData>): ITaskData {
|
||||
const result: ITaskData = {
|
||||
executionStatus: 'success',
|
||||
executionTime: 0,
|
||||
@@ -41,6 +41,7 @@ export function toITaskData(taskData: TaskData[]): ITaskData {
|
||||
executionIndex: 0,
|
||||
source: [],
|
||||
data: {},
|
||||
...(overrides ?? {}),
|
||||
};
|
||||
|
||||
// NOTE: Here to make TS happy.
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { IRunData } from 'n8n-workflow';
|
||||
|
||||
import { getNextExecutionIndex } from '../run-data-utils';
|
||||
|
||||
describe('getNextExecutionIndex', () => {
|
||||
it('should return 0 if runData is undefined', () => {
|
||||
const result = getNextExecutionIndex(undefined);
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
||||
it('should return 0 if runData is empty', () => {
|
||||
const result = getNextExecutionIndex({});
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
||||
it('should return the next execution index based on the highest executionIndex in runData', () => {
|
||||
const runData = mock<IRunData>({
|
||||
node1: [{ executionIndex: 0 }, { executionIndex: 1 }],
|
||||
node2: [{ executionIndex: 2 }],
|
||||
});
|
||||
const result = getNextExecutionIndex(runData);
|
||||
expect(result).toBe(3);
|
||||
});
|
||||
|
||||
it('should return 1 if all tasks in runData have executionIndex 0', () => {
|
||||
const runData = mock<IRunData>({
|
||||
node1: [{ executionIndex: 0 }, { executionIndex: 0 }],
|
||||
node2: [{ executionIndex: 0 }],
|
||||
});
|
||||
const result = getNextExecutionIndex(runData);
|
||||
expect(result).toBe(1);
|
||||
});
|
||||
|
||||
it('should handle runData with mixed executionIndex values', () => {
|
||||
const runData = mock<IRunData>({
|
||||
node1: [{ executionIndex: 5 }, { executionIndex: 3 }],
|
||||
node2: [{ executionIndex: 7 }, { executionIndex: 2 }],
|
||||
});
|
||||
const result = getNextExecutionIndex(runData);
|
||||
expect(result).toBe(8);
|
||||
});
|
||||
|
||||
it('should handle runData with missing executionIndex values', () => {
|
||||
const runData = mock<IRunData>({
|
||||
node1: [{}],
|
||||
node2: [{}, {}],
|
||||
});
|
||||
const result = getNextExecutionIndex(runData);
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
||||
it('should handle runData with negative executionIndex values', () => {
|
||||
const runData = mock<IRunData>({
|
||||
node1: [{ executionIndex: -5 }, { executionIndex: -10 }],
|
||||
node2: [{ executionIndex: -2 }],
|
||||
});
|
||||
const result = getNextExecutionIndex(runData);
|
||||
expect(result).toBe(-1);
|
||||
});
|
||||
});
|
||||
@@ -8,3 +8,4 @@ export { handleCycles } from './handle-cycles';
|
||||
export { filterDisabledNodes } from './filter-disabled-nodes';
|
||||
export { isTool } from './is-tool';
|
||||
export { rewireGraph } from './rewire-graph';
|
||||
export { getNextExecutionIndex } from './run-data-utils';
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
import type { IRunData } from 'n8n-workflow';
|
||||
|
||||
/**
|
||||
* Calculates the next execution index by finding the highest existing index in the run data and incrementing by 1.
|
||||
*
|
||||
* The execution index is used to track the sequence of workflow executions.
|
||||
*
|
||||
* @param {IRunData} [runData={}]
|
||||
* @returns {number} The next execution index (previous highest index + 1, or 0 if no previous executionIndex exist).
|
||||
*/
|
||||
export function getNextExecutionIndex(runData: IRunData = {}): number {
|
||||
// If runData is empty, return 0 as the first execution index
|
||||
if (!runData || Object.keys(runData).length === 0) return 0;
|
||||
|
||||
const previousIndices = Object.values(runData)
|
||||
.flat()
|
||||
.map((taskData) => taskData.executionIndex)
|
||||
// filter out undefined if previous execution does not have index
|
||||
// this can happen if rerunning execution before executionIndex was introduced
|
||||
.filter((value) => typeof value === 'number');
|
||||
|
||||
// If no valid indices were found, return 0 as the first execution index
|
||||
if (previousIndices.length === 0) return 0;
|
||||
|
||||
return Math.max(...previousIndices) + 1;
|
||||
}
|
||||
@@ -70,6 +70,7 @@ import {
|
||||
filterDisabledNodes,
|
||||
rewireGraph,
|
||||
isTool,
|
||||
getNextExecutionIndex,
|
||||
} from './partial-execution-utils';
|
||||
import { RoutingNode } from './routing-node';
|
||||
import { TriggersAndPollers } from './triggers-and-pollers';
|
||||
@@ -194,6 +195,9 @@ export class WorkflowExecute {
|
||||
let incomingNodeConnections: INodeConnections | undefined;
|
||||
let connection: IConnection;
|
||||
|
||||
// Increment currentExecutionIndex based on previous run
|
||||
this.additionalData.currentNodeExecutionIndex = getNextExecutionIndex(runData);
|
||||
|
||||
this.status = 'running';
|
||||
|
||||
const runIndex = 0;
|
||||
@@ -428,6 +432,10 @@ export class WorkflowExecute {
|
||||
recreateNodeExecutionStack(graph, startNodes, runData, pinData ?? {});
|
||||
|
||||
// 8. Execute
|
||||
|
||||
// Increment currentExecutionIndex based on previous run
|
||||
this.additionalData.currentNodeExecutionIndex = getNextExecutionIndex(runData);
|
||||
|
||||
this.status = 'running';
|
||||
this.runExecutionData = {
|
||||
startData: {
|
||||
|
||||
Reference in New Issue
Block a user