refactor(core): Move execution engine code out of n8n-workflow (no-changelog) (#12147)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-12-12 13:54:44 +01:00
committed by GitHub
parent 73f0c4cca9
commit 5a055ed526
44 changed files with 1995 additions and 1795 deletions

View File

@@ -22,11 +22,13 @@ import { Service } from 'typedi';
import { ErrorReporter } from './error-reporter';
import type { IWorkflowData } from './Interfaces';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { TriggersAndPollers } from './TriggersAndPollers';
@Service()
export class ActiveWorkflows {
constructor(
private readonly scheduledTaskManager: ScheduledTaskManager,
private readonly triggersAndPollers: TriggersAndPollers,
private readonly errorReporter: ErrorReporter,
) {}
@@ -78,7 +80,8 @@ export class ActiveWorkflows {
for (const triggerNode of triggerNodes) {
try {
triggerResponse = await workflow.runTrigger(
triggerResponse = await this.triggersAndPollers.runTrigger(
workflow,
triggerNode,
getTriggerFunctions,
additionalData,
@@ -153,7 +156,7 @@ export class ActiveWorkflows {
});
try {
const pollResponse = await workflow.runPoll(node, pollFunctions);
const pollResponse = await this.triggersAndPollers.runPoll(workflow, node, pollFunctions);
if (pollResponse !== null) {
pollFunctions.__emit(pollResponse);

View File

@@ -1,3 +1,6 @@
import type { INodeProperties } from 'n8n-workflow';
import { cronNodeOptions } from 'n8n-workflow';
export const CUSTOM_EXTENSION_ENV = 'N8N_CUSTOM_EXTENSIONS';
export const PLACEHOLDER_EMPTY_EXECUTION_ID = '__UNKNOWN__';
export const PLACEHOLDER_EMPTY_WORKFLOW_ID = '__EMPTY__';
@@ -12,3 +15,30 @@ export const CONFIG_FILES = 'N8N_CONFIG_FILES';
export const BINARY_DATA_STORAGE_PATH = 'N8N_BINARY_DATA_STORAGE_PATH';
export const UM_EMAIL_TEMPLATES_INVITE = 'N8N_UM_EMAIL_TEMPLATES_INVITE';
export const UM_EMAIL_TEMPLATES_PWRESET = 'N8N_UM_EMAIL_TEMPLATES_PWRESET';
export const commonPollingParameters: INodeProperties[] = [
{
displayName: 'Poll Times',
name: 'pollTimes',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
multipleValueButtonText: 'Add Poll Time',
},
default: { item: [{ mode: 'everyMinute' }] },
description: 'Time at which polling should occur',
placeholder: 'Add Poll Time',
options: cronNodeOptions,
},
];
export const commonCORSParameters: INodeProperties[] = [
{
displayName: 'Allowed Origins (CORS)',
name: 'allowedOrigins',
type: 'string',
default: '*',
description:
'Comma-separated list of URLs allowed for cross-origin non-preflight requests. Use * (default) to allow all origins.',
},
];

View File

@@ -6,6 +6,7 @@ import type {
ICredentialType,
ICredentialTypeData,
INodeCredentialDescription,
INodePropertyOptions,
INodeType,
INodeTypeBaseDescription,
INodeTypeData,
@@ -14,13 +15,18 @@ import type {
IVersionedNodeType,
KnownNodesAndCredentials,
} from 'n8n-workflow';
import { ApplicationError, LoggerProxy as Logger, NodeHelpers, jsonParse } from 'n8n-workflow';
import {
ApplicationError,
LoggerProxy as Logger,
applyDeclarativeNodeOptionParameters,
jsonParse,
} from 'n8n-workflow';
import { readFileSync } from 'node:fs';
import { readFile } from 'node:fs/promises';
import * as path from 'path';
import { loadClassInIsolation } from './ClassLoader';
import { CUSTOM_NODES_CATEGORY } from './Constants';
import { commonCORSParameters, commonPollingParameters, CUSTOM_NODES_CATEGORY } from './Constants';
import { UnrecognizedCredentialTypeError } from './errors/unrecognized-credential-type.error';
import { UnrecognizedNodeTypeError } from './errors/unrecognized-node-type.error';
import type { n8n } from './Interfaces';
@@ -135,7 +141,7 @@ export abstract class DirectoryLoader {
for (const version of Object.values(tempNode.nodeVersions)) {
this.addLoadOptionsMethods(version);
NodeHelpers.applySpecialNodeParameters(version);
this.applySpecialNodeParameters(version);
}
const currentVersionNode = tempNode.nodeVersions[tempNode.currentVersion];
@@ -150,7 +156,7 @@ export abstract class DirectoryLoader {
}
} else {
this.addLoadOptionsMethods(tempNode);
NodeHelpers.applySpecialNodeParameters(tempNode);
this.applySpecialNodeParameters(tempNode);
// Short renaming to avoid type issues
nodeVersion = Array.isArray(tempNode.description.version)
@@ -346,6 +352,24 @@ export abstract class DirectoryLoader {
}
}
private applySpecialNodeParameters(nodeType: INodeType): void {
const { properties, polling, supportsCORS } = nodeType.description;
if (polling) {
properties.unshift(...commonPollingParameters);
}
if (nodeType.webhook && supportsCORS) {
const optionsProperty = properties.find(({ name }) => name === 'options');
if (optionsProperty)
optionsProperty.options = [
...commonCORSParameters,
...(optionsProperty.options as INodePropertyOptions[]),
];
else properties.push(...commonCORSParameters);
}
applyDeclarativeNodeOptionParameters(nodeType);
}
private getIconPath(icon: string, filePath: string) {
const iconPath = path.join(path.dirname(filePath), icon.replace('file:', ''));
return `icons/${this.packageName}/${iconPath}`;

View File

@@ -41,8 +41,6 @@ import type {
IDataObject,
IExecuteData,
IExecuteFunctions,
IExecuteSingleFunctions,
IHookFunctions,
IHttpRequestOptions,
IN8nHttpFullResponse,
IN8nHttpResponse,
@@ -56,9 +54,7 @@ import type {
IRunExecutionData,
ITaskDataConnections,
ITriggerFunctions,
IWebhookData,
IWebhookDescription,
IWebhookFunctions,
IWorkflowDataProxyAdditionalKeys,
IWorkflowExecuteAdditionalData,
NodeExecutionWithMetadata,
@@ -121,15 +117,7 @@ import { DataDeduplicationService } from './data-deduplication-service';
import { InstanceSettings } from './InstanceSettings';
import type { IResponseError } from './Interfaces';
// eslint-disable-next-line import/no-cycle
import {
ExecuteContext,
ExecuteSingleContext,
HookContext,
PollContext,
SupplyDataContext,
TriggerContext,
WebhookContext,
} from './node-execution-context';
import { PollContext, SupplyDataContext, TriggerContext } from './node-execution-context';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { SSHClientsManager } from './SSHClientsManager';
@@ -2720,68 +2708,6 @@ export function getExecuteTriggerFunctions(
return new TriggerContext(workflow, node, additionalData, mode, activation);
}
/**
* Returns the execute functions regular nodes have access to.
*/
export function getExecuteFunctions(
workflow: Workflow,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
inputData: ITaskDataConnections,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
closeFunctions: CloseFunction[],
abortSignal?: AbortSignal,
): IExecuteFunctions {
return new ExecuteContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
closeFunctions,
abortSignal,
);
}
/**
* Returns the execute functions regular nodes have access to when single-function is defined.
*/
export function getExecuteSingleFunctions(
workflow: Workflow,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
inputData: ITaskDataConnections,
node: INode,
itemIndex: number,
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions {
return new ExecuteSingleContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
itemIndex,
executeData,
abortSignal,
);
}
export function getCredentialTestFunctions(): ICredentialTestFunctions {
return {
helpers: {
@@ -2792,41 +2718,3 @@ export function getCredentialTestFunctions(): ICredentialTestFunctions {
},
};
}
/**
* Returns the execute functions regular nodes have access to in hook-function.
*/
export function getExecuteHookFunctions(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
webhookData?: IWebhookData,
): IHookFunctions {
return new HookContext(workflow, node, additionalData, mode, activation, webhookData);
}
/**
* Returns the execute functions regular nodes have access to when webhook-function is defined.
*/
// TODO: check where it is used and make sure close functions are called
export function getExecuteWebhookFunctions(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
webhookData: IWebhookData,
closeFunctions: CloseFunction[],
runExecutionData: IRunExecutionData | null,
): IWebhookFunctions {
return new WebhookContext(
workflow,
node,
additionalData,
mode,
webhookData,
closeFunctions,
runExecutionData,
);
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,116 @@
import { ApplicationError } from 'n8n-workflow';
import type {
Workflow,
INode,
INodeExecutionData,
IPollFunctions,
IGetExecuteTriggerFunctions,
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
WorkflowActivateMode,
ITriggerResponse,
IDeferredPromise,
IExecuteResponsePromiseData,
IRun,
} from 'n8n-workflow';
import { Service } from 'typedi';
@Service()
export class TriggersAndPollers {
/**
* Runs the given trigger node so that it can trigger the workflow when the node has data.
*/
async runTrigger(
workflow: Workflow,
node: INode,
getTriggerFunctions: IGetExecuteTriggerFunctions,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<ITriggerResponse | undefined> {
const triggerFunctions = getTriggerFunctions(workflow, node, additionalData, mode, activation);
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if (!nodeType.trigger) {
throw new ApplicationError('Node type does not have a trigger function defined', {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
});
}
if (mode === 'manual') {
// In manual mode we do not just start the trigger function we also
// want to be able to get informed as soon as the first data got emitted
const triggerResponse = await nodeType.trigger.call(triggerFunctions);
// Add the manual trigger response which resolves when the first time data got emitted
triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => {
triggerFunctions.emit = (
(resolveEmit) =>
(
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun>,
) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) {
responsePromise.resolve(response);
}
},
];
if (donePromise) {
additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift(
async (runData: IRun): Promise<void> => {
return donePromise.resolve(runData);
},
);
}
resolveEmit(data);
}
)(resolve);
triggerFunctions.emitError = (
(rejectEmit) =>
(error: Error, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (): Promise<void> => {
if (responsePromise) {
responsePromise.reject(error);
}
},
];
rejectEmit(error);
}
)(reject);
});
return triggerResponse;
}
// In all other modes simply start the trigger
return await nodeType.trigger.call(triggerFunctions);
}
/**
* Runs the given poller node so that it can trigger the workflow when the node has data.
*/
async runPoll(
workflow: Workflow,
node: INode,
pollFunctions: IPollFunctions,
): Promise<INodeExecutionData[][] | null> {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if (!nodeType.poll) {
throw new ApplicationError('Node type does not have a poll function defined', {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
});
}
return await nodeType.poll.call(pollFunctions);
}
}

View File

@@ -37,6 +37,9 @@ import type {
StartNodeData,
NodeExecutionHint,
NodeInputConnections,
IRunNodeResponse,
IWorkflowIssues,
INodeIssues,
} from 'n8n-workflow';
import {
LoggerProxy as Logger,
@@ -47,11 +50,13 @@ import {
NodeExecutionOutput,
sleep,
ExecutionCancelledError,
Node,
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import Container from 'typedi';
import { ErrorReporter } from './error-reporter';
import { ExecuteContext, PollContext } from './node-execution-context';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import {
DirectedGraph,
@@ -63,6 +68,8 @@ import {
handleCycles,
filterDisabledNodes,
} from './PartialExecutionUtils';
import { RoutingNode } from './RoutingNode';
import { TriggersAndPollers } from './TriggersAndPollers';
export class WorkflowExecute {
private status: ExecutionStatus = 'new';
@@ -884,6 +891,280 @@ export class WorkflowExecute {
}
}
/**
* Checks if everything in the workflow is complete
* and ready to be executed. If it returns null everything
* is fine. If there are issues it returns the issues
* which have been found for the different nodes.
* TODO: Does currently not check for credential issues!
*/
checkReadyForExecution(
workflow: Workflow,
inputData: {
startNode?: string;
destinationNode?: string;
pinDataNodeNames?: string[];
} = {},
): IWorkflowIssues | null {
const workflowIssues: IWorkflowIssues = {};
let checkNodes: string[] = [];
if (inputData.destinationNode) {
// If a destination node is given we have to check all the nodes
// leading up to it
checkNodes = workflow.getParentNodes(inputData.destinationNode);
checkNodes.push(inputData.destinationNode);
} else if (inputData.startNode) {
// If a start node is given we have to check all nodes which
// come after it
checkNodes = workflow.getChildNodes(inputData.startNode);
checkNodes.push(inputData.startNode);
}
for (const nodeName of checkNodes) {
let nodeIssues: INodeIssues | null = null;
const node = workflow.nodes[nodeName];
if (node.disabled === true) {
continue;
}
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if (nodeType === undefined) {
// Node type is not known
nodeIssues = {
typeUnknown: true,
};
} else {
nodeIssues = NodeHelpers.getNodeParametersIssues(
nodeType.description.properties,
node,
inputData.pinDataNodeNames,
);
}
if (nodeIssues !== null) {
workflowIssues[node.name] = nodeIssues;
}
}
if (Object.keys(workflowIssues).length === 0) {
return null;
}
return workflowIssues;
}
/** Executes the given node */
// eslint-disable-next-line complexity
async runNode(
workflow: Workflow,
executionData: IExecuteData,
runExecutionData: IRunExecutionData,
runIndex: number,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
if (node.disabled === true) {
// If node is disabled simply pass the data through
// return NodeRunHelpers.
if (inputData.hasOwnProperty('main') && inputData.main.length > 0) {
// If the node is disabled simply return the data from the first main input
if (inputData.main[0] === null) {
return { data: undefined };
}
return { data: [inputData.main[0]] };
}
return { data: undefined };
}
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
let connectionInputData: INodeExecutionData[] = [];
if (nodeType.execute || (!nodeType.poll && !nodeType.trigger && !nodeType.webhook)) {
// Only stop if first input is empty for execute runs. For all others run anyways
// because then it is a trigger node. As they only pass data through and so the input-data
// becomes output-data it has to be possible.
if (inputData.main?.length > 0) {
// We always use the data of main input and the first input for execute
connectionInputData = inputData.main[0] as INodeExecutionData[];
}
const forceInputNodeExecution = workflow.settings.executionOrder !== 'v1';
if (!forceInputNodeExecution) {
// If the nodes do not get force executed data of some inputs may be missing
// for that reason do we use the data of the first one that contains any
for (const mainData of inputData.main) {
if (mainData?.length) {
connectionInputData = mainData;
break;
}
}
}
if (connectionInputData.length === 0) {
// No data for node so return
return { data: undefined };
}
}
if (
runExecutionData.resultData.lastNodeExecuted === node.name &&
runExecutionData.resultData.error !== undefined
) {
// The node did already fail. So throw an error here that it displays and logs it correctly.
// Does get used by webhook and trigger nodes in case they throw an error that it is possible
// to log the error and display in Editor-UI.
if (
runExecutionData.resultData.error.name === 'NodeOperationError' ||
runExecutionData.resultData.error.name === 'NodeApiError'
) {
throw runExecutionData.resultData.error;
}
const error = new Error(runExecutionData.resultData.error.message);
error.stack = runExecutionData.resultData.error.stack;
throw error;
}
if (node.executeOnce === true) {
// If node should be executed only once so use only the first input item
const newInputData: ITaskDataConnections = {};
for (const connectionType of Object.keys(inputData)) {
newInputData[connectionType] = inputData[connectionType].map((input) => {
// eslint-disable-next-line @typescript-eslint/prefer-optional-chain
return input && input.slice(0, 1);
});
}
inputData = newInputData;
}
if (nodeType.execute) {
const closeFunctions: CloseFunction[] = [];
const context = new ExecuteContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executionData,
closeFunctions,
abortSignal,
);
const data =
nodeType instanceof Node
? await nodeType.execute(context)
: await nodeType.execute.call(context);
const closeFunctionsResults = await Promise.allSettled(
closeFunctions.map(async (fn) => await fn()),
);
const closingErrors = closeFunctionsResults
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
.map((result) => result.reason);
if (closingErrors.length > 0) {
if (closingErrors[0] instanceof Error) throw closingErrors[0];
throw new ApplicationError("Error on execution node's close function(s)", {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
cause: closingErrors,
});
}
return { data };
} else if (nodeType.poll) {
if (mode === 'manual') {
// In manual mode run the poll function
const context = new PollContext(workflow, node, additionalData, mode, 'manual');
return { data: await nodeType.poll.call(context) };
}
// In any other mode pass data through as it already contains the result of the poll
return { data: inputData.main as INodeExecutionData[][] };
} else if (nodeType.trigger) {
if (mode === 'manual') {
// In manual mode start the trigger
const triggerResponse = await Container.get(TriggersAndPollers).runTrigger(
workflow,
node,
NodeExecuteFunctions.getExecuteTriggerFunctions,
additionalData,
mode,
'manual',
);
if (triggerResponse === undefined) {
return { data: null };
}
let closeFunction;
if (triggerResponse.closeFunction) {
// In manual mode we return the trigger closeFunction. That allows it to be called directly
// but we do not have to wait for it to finish. That is important for things like queue-nodes.
// There the full close will may be delayed till a message gets acknowledged after the execution.
// If we would not be able to wait for it to close would it cause problems with "own" mode as the
// process would be killed directly after it and so the acknowledge would not have been finished yet.
closeFunction = triggerResponse.closeFunction;
// Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers
abortSignal?.addEventListener('abort', closeFunction);
}
if (triggerResponse.manualTriggerFunction !== undefined) {
// If a manual trigger function is defined call it and wait till it did run
await triggerResponse.manualTriggerFunction();
}
const response = await triggerResponse.manualTriggerResponse!;
if (response.length === 0) {
return { data: null, closeFunction };
}
return { data: response, closeFunction };
}
// For trigger nodes in any mode except "manual" do we simply pass the data through
return { data: inputData.main as INodeExecutionData[][] };
} else if (nodeType.webhook) {
// For webhook nodes always simply pass the data through
return { data: inputData.main as INodeExecutionData[][] };
} else {
// For nodes which have routing information on properties
const routingNode = new RoutingNode(
workflow,
node,
connectionInputData,
runExecutionData ?? null,
additionalData,
mode,
);
return {
data: await routingNode.runNode(
inputData,
runIndex,
nodeType,
executionData,
undefined,
abortSignal,
),
};
}
}
/**
* Runs the given execution data.
*
@@ -909,7 +1190,7 @@ export class WorkflowExecute {
const pinDataNodeNames = Object.keys(this.runExecutionData.resultData.pinData ?? {});
const workflowIssues = workflow.checkReadyForExecution({
const workflowIssues = this.checkReadyForExecution(workflow, {
startNode,
destinationNode,
pinDataNodeNames,
@@ -1171,12 +1452,12 @@ export class WorkflowExecute {
workflowId: workflow.id,
});
let runNodeData = await workflow.runNode(
let runNodeData = await this.runNode(
workflow,
executionData,
this.runExecutionData,
runIndex,
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController.signal,
);
@@ -1188,12 +1469,12 @@ export class WorkflowExecute {
while (didContinueOnFail && tryIndex !== maxTries - 1) {
await sleep(waitBetweenTries);
runNodeData = await workflow.runNode(
runNodeData = await this.runNode(
workflow,
executionData,
this.runExecutionData,
runIndex,
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController.signal,
);
@@ -1230,19 +1511,20 @@ export class WorkflowExecute {
const closeFunctions: CloseFunction[] = [];
// Create a WorkflowDataProxy instance that we can get the data of the
// item which did error
const executeFunctions = NodeExecuteFunctions.getExecuteFunctions(
const executeFunctions = new ExecuteContext(
workflow,
executionData.node,
this.additionalData,
this.mode,
this.runExecutionData,
runIndex,
[],
executionData.data,
executionData.node,
this.additionalData,
executionData,
this.mode,
closeFunctions,
this.abortController.signal,
);
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
// Loop over all outputs except the error output as it would not contain data by default

View File

@@ -11,6 +11,7 @@ export * from './DirectoryLoader';
export * from './Interfaces';
export { InstanceSettings, InstanceType } from './InstanceSettings';
export * from './NodeExecuteFunctions';
export * from './RoutingNode';
export * from './WorkflowExecute';
export { NodeExecuteFunctions };
export * from './data-deduplication-service';