diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 90a0b39660..5f32136897 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -591,6 +591,7 @@ export class ActiveWorkflowRunner { data: { main: data, }, + source: null, }, ]; @@ -603,6 +604,7 @@ export class ActiveWorkflowRunner { contextData: {}, nodeExecutionStack, waitingExecution: {}, + waitingExecutionSource: {}, }, }; diff --git a/packages/cli/src/CredentialsHelper.ts b/packages/cli/src/CredentialsHelper.ts index 7707cd1a39..0e50433173 100644 --- a/packages/cli/src/CredentialsHelper.ts +++ b/packages/cli/src/CredentialsHelper.ts @@ -190,6 +190,7 @@ export class CredentialsHelper extends ICredentialsHelper { 'internal', defaultTimezone, additionalKeys, + undefined, '', ); @@ -366,6 +367,7 @@ export class CredentialsHelper extends ICredentialsHelper { mode, timezone, {}, + undefined, false, decryptedData, ) as ICredentialDataDecryptedObject; @@ -398,6 +400,7 @@ export class CredentialsHelper extends ICredentialsHelper { defaultTimezone, {}, undefined, + undefined, decryptedData, ) as ICredentialDataDecryptedObject; } @@ -642,6 +645,7 @@ export class CredentialsHelper extends ICredentialsHelper { inputData, runIndex, nodeTypeCopy, + { node, data: {}, source: null }, NodeExecuteFunctions, credentialsDecrypted, ); diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index b2700afc48..2083acb510 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -198,6 +198,7 @@ export async function executeWebhook( executionMode, additionalData.timezone, additionalKeys, + undefined, 'onReceived', ); const responseCode = workflow.expression.getSimpleParameterValue( @@ -206,6 +207,7 @@ export async function executeWebhook( executionMode, additionalData.timezone, additionalKeys, + undefined, 200, ) as number; @@ -215,6 +217,7 @@ export async function executeWebhook( executionMode, additionalData.timezone, additionalKeys, + undefined, 'firstEntryJson', ); @@ -288,6 +291,7 @@ export async function executeWebhook( additionalData.timezone, additionalKeys, undefined, + undefined, ) as { entries?: | Array<{ @@ -373,6 +377,7 @@ export async function executeWebhook( data: { main: webhookResultData.workflowData, }, + source: null, }); runExecutionData = @@ -546,6 +551,7 @@ export async function executeWebhook( additionalData.timezone, additionalKeys, undefined, + undefined, ); if (responsePropertyName !== undefined) { @@ -559,6 +565,7 @@ export async function executeWebhook( additionalData.timezone, additionalKeys, undefined, + undefined, ); if (responseContentType !== undefined) { @@ -603,6 +610,7 @@ export async function executeWebhook( executionMode, additionalData.timezone, additionalKeys, + undefined, 'data', ); diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index d29deecf94..f0f73ac2b9 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -397,6 +397,7 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx contextData: {}, nodeExecutionStack: [], waitingExecution: {}, + waitingExecutionSource: {}, }, }; } @@ -752,6 +753,7 @@ export async function getRunData( data: { main: [inputData], }, + source: null, }); const runExecutionData: IRunExecutionData = { @@ -763,6 +765,7 @@ export async function getRunData( contextData: {}, nodeExecutionStack, waitingExecution: {}, + waitingExecutionSource: {}, }, }; diff --git a/packages/cli/src/WorkflowHelpers.ts b/packages/cli/src/WorkflowHelpers.ts index 8f4bd1f917..e625c6c591 100644 --- a/packages/cli/src/WorkflowHelpers.ts +++ b/packages/cli/src/WorkflowHelpers.ts @@ -189,6 +189,7 @@ export async function executeErrorWorkflow( ], ], }, + source: null, }); const runExecutionData: IRunExecutionData = { @@ -200,6 +201,7 @@ export async function executeErrorWorkflow( contextData: {}, nodeExecutionStack, waitingExecution: {}, + waitingExecutionSource: {}, }, }; diff --git a/packages/core/src/LoadNodeParameterOptions.ts b/packages/core/src/LoadNodeParameterOptions.ts index ee24e85a61..c38983c7f9 100644 --- a/packages/core/src/LoadNodeParameterOptions.ts +++ b/packages/core/src/LoadNodeParameterOptions.ts @@ -210,6 +210,7 @@ export class LoadNodeParameterOptions { inputData, runIndex, tempNode, + { node: node!, source: null, data: {} }, NodeExecuteFunctions, ); diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 03cf4a0242..0d0dbfe4a4 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -56,6 +56,7 @@ import { WorkflowDataProxy, WorkflowExecuteMode, LoggerProxy as Logger, + IExecuteData, } from 'n8n-workflow'; import { Agent } from 'https'; @@ -1447,6 +1448,7 @@ export function getNodeParameter( mode: WorkflowExecuteMode, timezone: string, additionalKeys: IWorkflowDataProxyAdditionalKeys, + executeData?: IExecuteData, fallbackValue?: any, ): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object { const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); @@ -1472,11 +1474,13 @@ export function getNodeParameter( mode, timezone, additionalKeys, + executeData, ); returnData = cleanupParameterData(returnData); } catch (e) { - e.message += ` [Error in parameter: "${parameterName}"]`; + if (e.context) e.context.parameter = parameterName; + e.cause = value; throw e; } @@ -1543,6 +1547,7 @@ export function getNodeWebhookUrl( mode, timezone, additionalKeys, + undefined, false, ) as boolean; return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString(), isFullPath); @@ -1673,6 +1678,7 @@ export function getExecutePollFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + undefined, fallbackValue, ); }, @@ -1827,6 +1833,7 @@ export function getExecuteTriggerFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + undefined, fallbackValue, ); }, @@ -1940,6 +1947,7 @@ export function getExecuteFunctions( inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, + executeData: IExecuteData, mode: WorkflowExecuteMode, ): IExecuteFunctions { return ((workflow, runExecutionData, connectionInputData, inputData, node) => { @@ -1959,6 +1967,7 @@ export function getExecuteFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + executeData, ); }, async executeWorkflow( @@ -2035,6 +2044,7 @@ export function getExecuteFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + executeData, fallbackValue, ); }, @@ -2050,6 +2060,9 @@ export function getExecuteFunctions( getTimezone: (): string => { return getTimezone(workflow, additionalData); }, + getExecuteData: (): IExecuteData => { + return executeData; + }, getWorkflow: () => { return getWorkflowMetadata(workflow); }, @@ -2065,6 +2078,7 @@ export function getExecuteFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + executeData, ); return dataProxy.getDataProxy(); }, @@ -2199,6 +2213,7 @@ export function getExecuteSingleFunctions( node: INode, itemIndex: number, additionalData: IWorkflowExecuteAdditionalData, + executeData: IExecuteData, mode: WorkflowExecuteMode, ): IExecuteSingleFunctions { return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => { @@ -2219,6 +2234,7 @@ export function getExecuteSingleFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + executeData, ); }, getContext(type: string): IContextObject { @@ -2276,6 +2292,9 @@ export function getExecuteSingleFunctions( getTimezone: (): string => { return getTimezone(workflow, additionalData); }, + getExecuteData: (): IExecuteData => { + return executeData; + }, getNodeParameter: ( parameterName: string, fallbackValue?: any, @@ -2296,6 +2315,7 @@ export function getExecuteSingleFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + executeData, fallbackValue, ); }, @@ -2314,6 +2334,7 @@ export function getExecuteSingleFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + executeData, ); return dataProxy.getDataProxy(); }, @@ -2471,6 +2492,7 @@ export function getLoadOptionsFunctions( 'internal' as WorkflowExecuteMode, additionalData.timezone, getAdditionalKeys(additionalData), + undefined, fallbackValue, ); }, @@ -2601,6 +2623,7 @@ export function getExecuteHookFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + undefined, fallbackValue, ); }, @@ -2763,6 +2786,7 @@ export function getExecuteWebhookFunctions( mode, additionalData.timezone, getAdditionalKeys(additionalData), + undefined, fallbackValue, ); }, diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 9b36f1d517..177ca9efca 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -22,9 +22,12 @@ import { IRun, IRunData, IRunExecutionData, + ISourceData, ITaskData, ITaskDataConnections, + ITaskDataConnectionsSource, IWaitingForExecution, + IWaitingForExecutionSource, IWorkflowExecuteAdditionalData, LoggerProxy as Logger, NodeApiError, @@ -61,6 +64,7 @@ export class WorkflowExecute { contextData: {}, nodeExecutionStack: [], waitingExecution: {}, + waitingExecutionSource: {}, }, }; } @@ -106,6 +110,7 @@ export class WorkflowExecute { ], ], }, + source: null, }, ]; @@ -121,6 +126,7 @@ export class WorkflowExecute { contextData: {}, nodeExecutionStack, waitingExecution: {}, + waitingExecutionSource: {}, }, }; @@ -157,10 +163,12 @@ export class WorkflowExecute { // the data from runData const nodeExecutionStack: IExecuteData[] = []; const waitingExecution: IWaitingForExecution = {}; + const waitingExecutionSource: IWaitingForExecutionSource = {}; for (const startNode of startNodes) { incomingNodeConnections = workflow.connectionsByDestinationNode[startNode]; const incomingData: INodeExecutionData[][] = []; + let incomingSourceData: ITaskDataConnectionsSource | null = null; if (incomingNodeConnections === undefined) { // If it has no incoming data add the default empty data @@ -171,6 +179,7 @@ export class WorkflowExecute { ]); } else { // Get the data of the incoming connections + incomingSourceData = { main: [] }; for (const connections of incomingNodeConnections.main) { for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) { connection = connections[inputIndex]; @@ -178,6 +187,9 @@ export class WorkflowExecute { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion runData[connection.node][runIndex].data![connection.type][connection.index]!, ); + incomingSourceData.main.push({ + previousNode: connection.node, + }); } } } @@ -187,6 +199,7 @@ export class WorkflowExecute { data: { main: incomingData, }, + source: incomingSourceData, }; nodeExecutionStack.push(executeData); @@ -201,12 +214,15 @@ export class WorkflowExecute { if (waitingExecution[destinationNode] === undefined) { waitingExecution[destinationNode] = {}; + waitingExecutionSource[destinationNode] = {}; } if (waitingExecution[destinationNode][runIndex] === undefined) { waitingExecution[destinationNode][runIndex] = {}; + waitingExecutionSource[destinationNode][runIndex] = {}; } if (waitingExecution[destinationNode][runIndex][connection.type] === undefined) { waitingExecution[destinationNode][runIndex][connection.type] = []; + waitingExecutionSource[destinationNode][runIndex][connection.type] = []; } if (runData[connection.node] !== undefined) { @@ -215,8 +231,14 @@ export class WorkflowExecute { waitingExecution[destinationNode][runIndex][connection.type].push( runData[connection.node][runIndex].data![connection.type][connection.index], ); + waitingExecutionSource[destinationNode][runIndex][connection.type].push({ + previousNode: connection.node, + previousNodeOutput: connection.index || undefined, + previousNodeRun: runIndex || undefined, + } as ISourceData); } else { waitingExecution[destinationNode][runIndex][connection.type].push(null); + waitingExecutionSource[destinationNode][runIndex][connection.type].push(null); } } } @@ -241,6 +263,7 @@ export class WorkflowExecute { contextData: {}, nodeExecutionStack, waitingExecution, + waitingExecutionSource, }, }; @@ -303,12 +326,17 @@ export class WorkflowExecute { // Node has multiple inputs let nodeWasWaiting = true; + if (this.runExecutionData.executionData!.waitingExecutionSource === null) { + this.runExecutionData.executionData!.waitingExecutionSource = {}; + } + // Check if there is already data for the node if ( this.runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined ) { // Node does not have data yet so create a new empty one this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node] = {}; nodeWasWaiting = false; } if ( @@ -319,6 +347,10 @@ export class WorkflowExecute { this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { main: [], }; + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][runIndex] = + { + main: [], + }; for ( let i = 0; i < workflow.connectionsByDestinationNode[connectionData.node].main.length; @@ -327,6 +359,10 @@ export class WorkflowExecute { this.runExecutionData.executionData!.waitingExecution[connectionData.node][ runIndex ].main.push(null); + + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][ + runIndex + ].main.push(null); } } @@ -335,10 +371,20 @@ export class WorkflowExecute { this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[ connectionData.index ] = null; + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][ + runIndex + ].main[connectionData.index] = null; } else { this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[ connectionData.index ] = nodeSuccessData[outputIndex]; + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][ + runIndex + ].main[connectionData.index] = { + previousNode: parentNodeName, + previousNodeOutput: outputIndex || undefined, + previousNodeRun: runIndex || undefined, + }; } // Check if all data exists now @@ -364,15 +410,32 @@ export class WorkflowExecute { if (allDataFound) { // All data exists for node to be executed // So add it to the execution stack - this.runExecutionData.executionData!.nodeExecutionStack.push({ + + const executionStackItem = { node: workflow.nodes[connectionData.node], data: this.runExecutionData.executionData!.waitingExecution[connectionData.node][ runIndex ], - }); + source: + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][ + runIndex + ], + } as IExecuteData; + + if (this.runExecutionData.executionData!.waitingExecutionSource !== null) { + executionStackItem.source = + this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][ + runIndex + ]; + } + + this.runExecutionData.executionData!.nodeExecutionStack.push(executionStackItem); // Remove the data from waiting delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]; + delete this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][ + runIndex + ]; if ( Object.keys(this.runExecutionData.executionData!.waitingExecution[connectionData.node]) @@ -380,6 +443,7 @@ export class WorkflowExecute { ) { // No more data left for the node so also delete that one delete this.runExecutionData.executionData!.waitingExecution[connectionData.node]; + delete this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node]; } return; } @@ -534,6 +598,15 @@ export class WorkflowExecute { ], ], }, + source: { + main: [ + { + previousNode: parentNodeName, + previousNodeOutput: outputIndex || undefined, + previousNodeRun: runIndex || undefined, + }, + ], + }, }); } } @@ -571,6 +644,15 @@ export class WorkflowExecute { data: { main: connectionDataArray, }, + source: { + main: [ + { + previousNode: parentNodeName, + previousNodeOutput: outputIndex || undefined, + previousNodeRun: runIndex || undefined, + }, + ], + }, }); } } @@ -660,6 +742,7 @@ export class WorkflowExecute { data: { main: executionData.data.main, } as ITaskDataConnections, + source: [], }, ], }, @@ -691,6 +774,29 @@ export class WorkflowExecute { this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData; executionNode = executionData.node; + // Update the pairedItem information on items + const newTaskDataConnections: ITaskDataConnections = {}; + for (const inputName of Object.keys(executionData.data)) { + newTaskDataConnections[inputName] = executionData.data[inputName].map( + (input, inputIndex) => { + if (input === null) { + return input; + } + + return input.map((item, itemIndex) => { + return { + ...item, + pairedItem: { + item: itemIndex, + input: inputIndex || undefined, + }, + }; + }); + }, + ); + } + executionData.data = newTaskDataConnections; + Logger.debug(`Start processing node "${executionNode.name}"`, { node: executionNode.name, workflowId: workflow.id, @@ -767,9 +873,6 @@ export class WorkflowExecute { } } - // Clone input data that nodes can not mess up data of parallel nodes which receive the same data - // TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned - // is very slow so only do if needed startTime = new Date().getTime(); let maxTries = 1; @@ -813,8 +916,7 @@ export class WorkflowExecute { workflowId: workflow.id, }); const runNodeData = await workflow.runNode( - executionData.node, - executionData.data, + executionData, this.runExecutionData, runIndex, this.additionalData, @@ -834,6 +936,30 @@ export class WorkflowExecute { workflowId: workflow.id, }); + // Check if the output data contains pairedItem data + checkOutputData: for (const outputData of nodeSuccessData as INodeExecutionData[][]) { + if (outputData === null) { + continue; + } + for (const item of outputData) { + if (!item.pairedItem) { + // The pairedItem is missing so check if it can get automatically fixed + if ( + executionData.data.main.length !== 1 || + executionData.data.main[0]?.length !== 1 + ) { + // Automatically fixing is only possible if there is only one + // input and one input item + break checkOutputData; + } + + item.pairedItem = { + item: 0, + }; + } + } + } + if (nodeSuccessData === undefined) { // Node did not get executed nodeSuccessData = null; @@ -885,6 +1011,7 @@ export class WorkflowExecute { taskData = { startTime, executionTime: new Date().getTime() - startTime, + source: executionData.source === null ? [] : executionData.source.main, }; if (executionError !== undefined) { diff --git a/packages/editor-ui/src/components/CodeEdit.vue b/packages/editor-ui/src/components/CodeEdit.vue index 04a5808d48..b23d734284 100644 --- a/packages/editor-ui/src/components/CodeEdit.vue +++ b/packages/editor-ui/src/components/CodeEdit.vue @@ -144,7 +144,10 @@ export default mixins( const workflow = this.getWorkflow(); const activeNode: INodeUi | null = this.$store.getters.activeNode; const parentNode = workflow.getParentNodes(activeNode!.name, inputName, 1); - const inputIndex = workflow.getNodeConnectionOutputIndex(activeNode!.name, parentNode[0]) || 0; + const nodeConnection = workflow.getNodeConnectionIndexes(activeNode!.name, parentNode[0]) || { + sourceIndex: 0, + destinationIndex: 0, + }; const autocompleteData: string[] = []; @@ -164,7 +167,7 @@ export default mixins( } } - const connectionInputData = this.connectionInputData(parentNode, inputName, runIndex, inputIndex); + const connectionInputData = this.connectionInputData(parentNode, activeNode!.name, inputName, runIndex, nodeConnection); const additionalProxyKeys: IWorkflowDataProxyAdditionalKeys = { $executionId: PLACEHOLDER_FILLED_AT_EXECUTION_TIME, diff --git a/packages/editor-ui/src/components/Error/NodeErrorView.vue b/packages/editor-ui/src/components/Error/NodeErrorView.vue index c36fdd30a3..4637016057 100644 --- a/packages/editor-ui/src/components/Error/NodeErrorView.vue +++ b/packages/editor-ui/src/components/Error/NodeErrorView.vue @@ -1,7 +1,7 @@