feat(core): Add optional Error-Output (#7460)

Add an additional optional error output to which all items get sent that
could not be processed.
![Screenshot from 2023-10-18
17-29-15](https://github.com/n8n-io/n8n/assets/6249596/e9732807-ab2b-4662-a5f6-bdff24f7ad55)

Github issue / Community forum post (link here to close automatically):
https://community.n8n.io/t/error-connector-for-nodes/3094

https://community.n8n.io/t/error-handling-at-node-level-detect-node-execution-status/26791

---------

Co-authored-by: OlegIvaniv <me@olegivaniv.com>
This commit is contained in:
Jan Oberhauser
2023-10-30 18:42:47 +01:00
committed by GitHub
parent 442c73e63b
commit 655efeaf66
20 changed files with 1090 additions and 61 deletions

View File

@@ -9,6 +9,7 @@ import PCancelable from 'p-cancelable';
import type {
ExecutionError,
ExecutionStatus,
GenericValue,
IConnection,
IDataObject,
IExecuteData,
@@ -36,6 +37,8 @@ import {
IRunExecutionData,
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
NodeHelpers,
NodeConnectionType,
} from 'n8n-workflow';
import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
@@ -1041,6 +1044,7 @@ export class WorkflowExecute {
node: executionNode.name,
workflowId: workflow.id,
});
const runNodeData = await workflow.runNode(
executionData,
this.runExecutionData,
@@ -1051,6 +1055,112 @@ export class WorkflowExecute {
);
nodeSuccessData = runNodeData.data;
if (nodeSuccessData && executionData.node.onError === 'continueErrorOutput') {
// If errorOutput is activated check all the output items for error data.
// If any is found, route them to the last output as that will be the
// error output.
const nodeType = workflow.nodeTypes.getByNameAndVersion(
executionData.node.type,
executionData.node.typeVersion,
);
const outputs = NodeHelpers.getNodeOutputs(
workflow,
executionData.node,
nodeType.description,
);
const outputTypes = NodeHelpers.getConnectionTypes(outputs);
const mainOutputTypes = outputTypes.filter(
(output) => output === NodeConnectionType.Main,
);
const errorItems: INodeExecutionData[] = [];
const successItems: INodeExecutionData[] = [];
// Create a WorkflowDataProxy instance that we can get the data of the
// item which did error
const executeFunctions = NodeExecuteFunctions.getExecuteFunctions(
workflow,
this.runExecutionData,
runIndex,
[],
executionData.data,
executionData.node,
this.additionalData,
executionData,
this.mode,
);
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
// Loop over all outputs except the error output as it would not contain data by default
for (
let outputIndex = 0;
outputIndex < mainOutputTypes.length - 1;
outputIndex++
) {
successItems.length = 0;
const items = nodeSuccessData.length ? nodeSuccessData[0] : [];
while (items.length) {
const item = items.pop();
if (item === undefined) {
continue;
}
let errorData: GenericValue | undefined;
if (item.error) {
errorData = item.error;
item.error = undefined;
} else if (item.json.error && Object.keys(item.json).length === 1) {
errorData = item.json.error;
}
if (errorData) {
const pairedItemData =
item.pairedItem && typeof item.pairedItem === 'object'
? Array.isArray(item.pairedItem)
? item.pairedItem[0]
: item.pairedItem
: undefined;
if (executionData!.source === null || pairedItemData === undefined) {
// Source data is missing for some reason so we can not figure out the item
errorItems.push(item);
} else {
const pairedItemInputIndex = pairedItemData.input || 0;
const sourceData =
executionData!.source[NodeConnectionType.Main][pairedItemInputIndex];
const constPairedItem = dataProxy.$getPairedItem(
sourceData!.previousNode,
sourceData,
pairedItemData,
);
if (constPairedItem === null) {
errorItems.push(item);
} else {
errorItems.push({
...item,
json: {
...constPairedItem.json,
...item.json,
},
});
}
}
} else {
successItems.push(item);
}
}
nodeSuccessData[outputIndex] = successItems;
}
nodeSuccessData[mainOutputTypes.length - 1] = errorItems;
}
if (runNodeData.closeFunction) {
// Explanation why we do this can be found in n8n-workflow/Workflow.ts -> runNode
@@ -1180,7 +1290,12 @@ export class WorkflowExecute {
taskData.error = executionError;
taskData.executionStatus = 'error';
if (executionData.node.continueOnFail === true) {
if (
executionData.node.continueOnFail === true ||
['continueRegularOutput', 'continueErrorOutput'].includes(
executionData.node.onError || '',
)
) {
// Workflow should continue running even if node errors
if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) {
// Simply get the input data of the node if it has any and pass it through