mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 01:56:46 +00:00
fix(Execute Sub-workflow Node): Improve paired item handling for child workflows (#17065)
This commit is contained in:
@@ -392,6 +392,15 @@ export async function getBase(
|
|||||||
userId,
|
userId,
|
||||||
setExecutionStatus,
|
setExecutionStatus,
|
||||||
variables,
|
variables,
|
||||||
|
async getRunExecutionData(executionId) {
|
||||||
|
const executionRepository = Container.get(ExecutionRepository);
|
||||||
|
const executionData = await executionRepository.findSingleExecution(executionId, {
|
||||||
|
unflattenData: true,
|
||||||
|
includeData: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
return executionData?.data;
|
||||||
|
},
|
||||||
externalSecretsProxy: Container.get(ExternalSecretsProxy),
|
externalSecretsProxy: Container.get(ExternalSecretsProxy),
|
||||||
async startRunnerTask(
|
async startRunnerTask(
|
||||||
additionalData: IWorkflowExecuteAdditionalData,
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
|
|||||||
@@ -142,6 +142,10 @@ export class BaseExecuteContext extends NodeExecutionContext {
|
|||||||
return { ...result, data };
|
return { ...result, data };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getExecutionDataById(executionId: string): Promise<IRunExecutionData | undefined> {
|
||||||
|
return await this.additionalData.getRunExecutionData(executionId);
|
||||||
|
}
|
||||||
|
|
||||||
protected getInputItems(inputIndex: number, connectionType: NodeConnectionType) {
|
protected getInputItems(inputIndex: number, connectionType: NodeConnectionType) {
|
||||||
const inputData = this.inputData[connectionType];
|
const inputData = this.inputData[connectionType];
|
||||||
if (inputData.length < inputIndex) {
|
if (inputData.length < inputIndex) {
|
||||||
|
|||||||
@@ -7,10 +7,12 @@ import type {
|
|||||||
INodeTypeDescription,
|
INodeTypeDescription,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { findPairedItemTroughWorkflowData } from './../../../utils/workflow-backtracking';
|
||||||
import { getWorkflowInfo } from './GenericFunctions';
|
import { getWorkflowInfo } from './GenericFunctions';
|
||||||
import { localResourceMapping } from './methods';
|
import { localResourceMapping } from './methods';
|
||||||
import { generatePairedItemData } from '../../../utils/utilities';
|
import { generatePairedItemData } from '../../../utils/utilities';
|
||||||
import { getCurrentWorkflowInputData } from '../../../utils/workflowInputsResourceMapping/GenericFunctions';
|
import { getCurrentWorkflowInputData } from '../../../utils/workflowInputsResourceMapping/GenericFunctions';
|
||||||
|
|
||||||
export class ExecuteWorkflow implements INodeType {
|
export class ExecuteWorkflow implements INodeType {
|
||||||
description: INodeTypeDescription = {
|
description: INodeTypeDescription = {
|
||||||
displayName: 'Execute Sub-workflow',
|
displayName: 'Execute Sub-workflow',
|
||||||
@@ -425,6 +427,8 @@ export class ExecuteWorkflow implements INodeType {
|
|||||||
return [items];
|
return [items];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const workflowRunData = await this.getExecutionDataById(executionResult.executionId);
|
||||||
|
|
||||||
const workflowResult = executionResult.data as INodeExecutionData[][];
|
const workflowResult = executionResult.data as INodeExecutionData[][];
|
||||||
|
|
||||||
const fallbackPairedItemData = generatePairedItemData(items.length);
|
const fallbackPairedItemData = generatePairedItemData(items.length);
|
||||||
@@ -433,7 +437,20 @@ export class ExecuteWorkflow implements INodeType {
|
|||||||
const sameLength = output.length === items.length;
|
const sameLength = output.length === items.length;
|
||||||
|
|
||||||
for (const [itemIndex, item] of output.entries()) {
|
for (const [itemIndex, item] of output.entries()) {
|
||||||
if (item.pairedItem) continue;
|
if (item.pairedItem) {
|
||||||
|
// If the item already has a paired item, we need to follow these to the start of the child workflow
|
||||||
|
if (workflowRunData !== undefined) {
|
||||||
|
const pairedItem = findPairedItemTroughWorkflowData(
|
||||||
|
workflowRunData,
|
||||||
|
item,
|
||||||
|
itemIndex,
|
||||||
|
);
|
||||||
|
if (pairedItem !== undefined) {
|
||||||
|
item.pairedItem = pairedItem;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (sameLength) {
|
if (sameLength) {
|
||||||
item.pairedItem = { item: itemIndex };
|
item.pairedItem = { item: itemIndex };
|
||||||
|
|||||||
532
packages/nodes-base/utils/workflow-backtracking.test.ts
Normal file
532
packages/nodes-base/utils/workflow-backtracking.test.ts
Normal file
@@ -0,0 +1,532 @@
|
|||||||
|
import type {
|
||||||
|
INodeExecutionData,
|
||||||
|
IPairedItemData,
|
||||||
|
IRunExecutionData,
|
||||||
|
ISourceData,
|
||||||
|
ITaskData,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { previousTaskData, findPairedItemTroughWorkflowData } from './workflow-backtracking';
|
||||||
|
|
||||||
|
describe('backtracking.ts', () => {
|
||||||
|
describe('previousTaskData', () => {
|
||||||
|
it('should return undefined when source is empty', () => {
|
||||||
|
const runData = {};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when source is undefined', () => {
|
||||||
|
const runData = {};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
} as unknown as ITaskData; // Type assertion to match the expected type
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when previousNode is undefined', () => {
|
||||||
|
const runData = {};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [{} as unknown as ISourceData],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when run data for previousNode does not exist', () => {
|
||||||
|
const runData = {};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [{ previousNode: 'node1' }],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when run data for previousNode is empty', () => {
|
||||||
|
const runData = {
|
||||||
|
node1: [],
|
||||||
|
};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [{ previousNode: 'node1' }],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return the correct task data from previousNode', () => {
|
||||||
|
const expectedTaskData: ITaskData = {
|
||||||
|
data: { main: [[{ json: { test: 'value' } }]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData;
|
||||||
|
|
||||||
|
const runData = {
|
||||||
|
node1: [expectedTaskData],
|
||||||
|
};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [{ previousNode: 'node1' }],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBe(expectedTaskData);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return correct task data using previousNodeRun index', () => {
|
||||||
|
const taskData1: ITaskData = {
|
||||||
|
data: { main: [[{ json: { run: 1 } }]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData;
|
||||||
|
|
||||||
|
const taskData2: ITaskData = {
|
||||||
|
data: { main: [[{ json: { run: 2 } }]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData;
|
||||||
|
|
||||||
|
const runData = {
|
||||||
|
node1: [taskData1, taskData2],
|
||||||
|
};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [{ previousNode: 'node1', previousNodeRun: 1 }],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBe(taskData2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should default to index 0 when previousNodeRun is undefined', () => {
|
||||||
|
const taskData1: ITaskData = {
|
||||||
|
data: { main: [[{ json: { run: 1 } }]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData;
|
||||||
|
|
||||||
|
const taskData2: ITaskData = {
|
||||||
|
data: { main: [[{ json: { run: 2 } }]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData;
|
||||||
|
|
||||||
|
const runData = {
|
||||||
|
node1: [taskData1, taskData2],
|
||||||
|
};
|
||||||
|
const currentRunData: ITaskData = {
|
||||||
|
source: [{ previousNode: 'node1' }],
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = previousTaskData(runData, currentRunData);
|
||||||
|
|
||||||
|
expect(result).toBe(taskData1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('findPairedItemTroughWorkflowData', () => {
|
||||||
|
it('should return undefined when lastNodeExecuted is undefined', () => {
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {},
|
||||||
|
lastNodeExecuted: undefined,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when no run data exists for lastNodeExecuted', () => {
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when run data is empty', () => {
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return undefined when task data is undefined', () => {
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [undefined as any],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return paired item when no previous task data exists', () => {
|
||||||
|
const expectedPairedItem: IPairedItemData = { item: 0 };
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 0,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 0,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: expectedPairedItem,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBe(expectedPairedItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should backtrack through workflow data with simple paired item', () => {
|
||||||
|
const finalPairedItem: IPairedItemData = { item: 5 };
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: { main: [[item]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 1000,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
node2: [
|
||||||
|
{
|
||||||
|
data: { main: [[{ json: { value: 2 }, pairedItem: finalPairedItem }]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
executionIndex: 0,
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBe(finalPairedItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should backtrack through workflow data with object paired item', () => {
|
||||||
|
const finalPairedItem: IPairedItemData = { item: 3, input: 1 };
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0, input: 1 },
|
||||||
|
};
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: { main: [[item]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
node2: [
|
||||||
|
{
|
||||||
|
data: { main: [[], [{ json: { value: 2 }, pairedItem: finalPairedItem }]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBe(finalPairedItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use itemIndex parameter when paired item is numeric', () => {
|
||||||
|
const finalPairedItem: IPairedItemData = { item: 7 };
|
||||||
|
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: 2, // Numeric paired item
|
||||||
|
};
|
||||||
|
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: { main: [[item]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
node2: [
|
||||||
|
{
|
||||||
|
data: {
|
||||||
|
main: [
|
||||||
|
[
|
||||||
|
{ json: {} },
|
||||||
|
{ json: {} },
|
||||||
|
{ json: { value: 2 }, pairedItem: finalPairedItem },
|
||||||
|
],
|
||||||
|
],
|
||||||
|
},
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 5);
|
||||||
|
|
||||||
|
expect(result).toBe(finalPairedItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle multiple levels of backtracking', () => {
|
||||||
|
const finalPairedItem: IPairedItemData = { item: 10 };
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: { main: [[{ json: { value: 1 }, pairedItem: { item: 0 } }]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
node2: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node3' }],
|
||||||
|
data: { main: [[{ json: { value: 2 }, pairedItem: { item: 1 } }]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
node3: [
|
||||||
|
{
|
||||||
|
data: { main: [[null, { json: { value: 3 }, pairedItem: finalPairedItem }]] },
|
||||||
|
executionTime: 300,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 3000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBe(finalPairedItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use last run data when multiple runs exist', () => {
|
||||||
|
const finalPairedItem: IPairedItemData = { item: 15 };
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: { main: [[{ json: { value: 1 }, pairedItem: { item: 0 } }]] },
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: { main: [[{ json: { value: 2 }, pairedItem: { item: 0 } }]] },
|
||||||
|
executionTime: 150,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 1500,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
node2: [
|
||||||
|
{
|
||||||
|
data: { main: [[{ json: { value: 3 }, pairedItem: finalPairedItem }]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBe(finalPairedItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle missing nodeInformationArray gracefully', () => {
|
||||||
|
const workflowRunData: IRunExecutionData = {
|
||||||
|
resultData: {
|
||||||
|
runData: {
|
||||||
|
node1: [
|
||||||
|
{
|
||||||
|
source: [{ previousNode: 'node2' }],
|
||||||
|
data: {},
|
||||||
|
executionTime: 100,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 1000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
node2: [
|
||||||
|
{
|
||||||
|
data: { main: [[]] },
|
||||||
|
executionTime: 200,
|
||||||
|
executionStatus: 'success',
|
||||||
|
startTime: 2000,
|
||||||
|
} as unknown as ITaskData,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
lastNodeExecuted: 'node1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const item: INodeExecutionData = {
|
||||||
|
json: { test: 'value' },
|
||||||
|
pairedItem: { item: 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0);
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
114
packages/nodes-base/utils/workflow-backtracking.ts
Normal file
114
packages/nodes-base/utils/workflow-backtracking.ts
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
import type {
|
||||||
|
INodeExecutionData,
|
||||||
|
IPairedItemData,
|
||||||
|
IRunExecutionData,
|
||||||
|
ITaskData,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
|
/*
|
||||||
|
* These functions do not cover all possible edge cases for backtracking through workflow run data.
|
||||||
|
* They are designed to work for a simple and linear workflow execution.
|
||||||
|
* If the workflow has branches or complex execution paths, additional logic may be needed.
|
||||||
|
* We should follow up on this and improve the logic in the future.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we cannot backtrack correctly, we return undefined to fallback to the current paired item behavior
|
||||||
|
* failing in these functions will cause the parent workflow to fail
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function retrieves the previous task data for a given task in the workflow run data.
|
||||||
|
* Until there is no more source set
|
||||||
|
*/
|
||||||
|
export function previousTaskData(
|
||||||
|
runData: IRunExecutionData['resultData']['runData'],
|
||||||
|
currentRunData: ITaskData,
|
||||||
|
): ITaskData | undefined {
|
||||||
|
const nextNodeName = currentRunData.source?.[0]?.previousNode;
|
||||||
|
if (!nextNodeName) {
|
||||||
|
return undefined; // No next node
|
||||||
|
}
|
||||||
|
|
||||||
|
const nextRunData = runData[nextNodeName];
|
||||||
|
if (!nextRunData || nextRunData.length === 0) {
|
||||||
|
// We don't expect this case to happen in practice, but if for some reason it happens, we fallback to undefined
|
||||||
|
return undefined; // No run data for the next node
|
||||||
|
}
|
||||||
|
|
||||||
|
const nextRunIndex = currentRunData.source?.[0]?.previousNodeRun ?? 0;
|
||||||
|
|
||||||
|
return nextRunData[nextRunIndex]; // Return the first run data for the next node
|
||||||
|
}
|
||||||
|
|
||||||
|
export function findPairedItemTroughWorkflowData(
|
||||||
|
workflowRunData: IRunExecutionData,
|
||||||
|
item: INodeExecutionData,
|
||||||
|
itemIndex: number,
|
||||||
|
): IPairedItemData | IPairedItemData[] | number | undefined {
|
||||||
|
// The provided item is already the item of the last node executed in this workflow run
|
||||||
|
// So the item.pairedItem is the paired item of the last node executed and is therefore referencing
|
||||||
|
// a node in the previous task data
|
||||||
|
|
||||||
|
const currentNodeName = workflowRunData.resultData.lastNodeExecuted;
|
||||||
|
if (!currentNodeName) {
|
||||||
|
// If no node name is available, then we don't know where to start backtracking
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is the run data of the last node executed in the workflow run
|
||||||
|
const runData = workflowRunData.resultData.runData[currentNodeName];
|
||||||
|
|
||||||
|
if (!runData) {
|
||||||
|
// No run data available for the last node executed
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we are backtracking through the workflow, we start with the last run data
|
||||||
|
const runIndex = runData.length - 1;
|
||||||
|
|
||||||
|
const taskData = runData[runIndex];
|
||||||
|
|
||||||
|
if (!taskData) {
|
||||||
|
// If no run data is available, then the workflow did not run at all
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we are getting the second last task data, because our initial pairedItem points to this.
|
||||||
|
let runDataItem = previousTaskData(workflowRunData.resultData.runData, taskData);
|
||||||
|
|
||||||
|
let pairedItem = item.pairedItem;
|
||||||
|
|
||||||
|
// move the runDataItem to the previous node in the in the workflow execution data
|
||||||
|
// and find the paired item of the current item in the previous task data
|
||||||
|
// We do this until we reach the first task data of the workflow run
|
||||||
|
|
||||||
|
while (runDataItem !== undefined) {
|
||||||
|
// We find the output items for the current run data item
|
||||||
|
const nodeInformationArray = runDataItem.data?.['main'];
|
||||||
|
|
||||||
|
// We find and fallback to 0 for the input index and item index
|
||||||
|
// The input index is the run the node was executed in case it was executed multiple times
|
||||||
|
// The item index is the index of the paired item we are looking for
|
||||||
|
let inputIndex = 0;
|
||||||
|
let nodeIndex = itemIndex;
|
||||||
|
if (typeof pairedItem === 'object') {
|
||||||
|
inputIndex = (pairedItem as IPairedItemData).input ?? 0;
|
||||||
|
nodeIndex = (pairedItem as IPairedItemData).item ?? itemIndex;
|
||||||
|
} else if (typeof pairedItem === 'number') {
|
||||||
|
// If the paired item is a number, we use it as the node index
|
||||||
|
nodeIndex = pairedItem;
|
||||||
|
// and fallback to 0 for the input index
|
||||||
|
inputIndex = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We found the paired item of the current run data item, this points to the node in the previous task data
|
||||||
|
pairedItem = nodeInformationArray?.[inputIndex]?.[nodeIndex]?.pairedItem;
|
||||||
|
|
||||||
|
// We move the runDataItem to the previous task data
|
||||||
|
runDataItem = previousTaskData(workflowRunData.resultData.runData, runDataItem);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is the paired item that was in the first task data when the workflow was executed
|
||||||
|
return pairedItem;
|
||||||
|
}
|
||||||
@@ -908,6 +908,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
|
|||||||
parentExecution?: RelatedExecution;
|
parentExecution?: RelatedExecution;
|
||||||
},
|
},
|
||||||
): Promise<ExecuteWorkflowData>;
|
): Promise<ExecuteWorkflowData>;
|
||||||
|
getExecutionDataById(executionId: string): Promise<IRunExecutionData | undefined>;
|
||||||
getInputConnectionData(
|
getInputConnectionData(
|
||||||
connectionType: AINodeConnectionType,
|
connectionType: AINodeConnectionType,
|
||||||
itemIndex: number,
|
itemIndex: number,
|
||||||
@@ -2390,6 +2391,7 @@ export interface IWorkflowExecuteAdditionalData {
|
|||||||
additionalData: IWorkflowExecuteAdditionalData,
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
options: ExecuteWorkflowOptions,
|
options: ExecuteWorkflowOptions,
|
||||||
) => Promise<ExecuteWorkflowData>;
|
) => Promise<ExecuteWorkflowData>;
|
||||||
|
getRunExecutionData: (executionId: string) => Promise<IRunExecutionData | undefined>;
|
||||||
executionId?: string;
|
executionId?: string;
|
||||||
restartExecutionId?: string;
|
restartExecutionId?: string;
|
||||||
currentNodeExecutionIndex: number;
|
currentNodeExecutionIndex: number;
|
||||||
|
|||||||
Reference in New Issue
Block a user