diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index 573014cf62..d7af9ac794 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -127,11 +127,22 @@ export class Worker extends Command { staticData = workflowData.staticData; } + let workflowTimeout = config.get('executions.timeout') as number; // initialize with default + if (currentExecutionDb.workflowData.settings && currentExecutionDb.workflowData.settings.executionTimeout) { + workflowTimeout = currentExecutionDb.workflowData.settings!.executionTimeout as number; // preference on workflow setting + } + + let executionTimeoutTimestamp: number | undefined; + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number); + executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000; + } + const workflow = new Workflow({ id: currentExecutionDb.workflowData.id as string, name: currentExecutionDb.workflowData.name, nodes: currentExecutionDb.workflowData!.nodes, connections: currentExecutionDb.workflowData!.connections, active: currentExecutionDb.workflowData!.active, nodeTypes, staticData, settings: currentExecutionDb.workflowData!.settings }); const credentials = await WorkflowCredentials(currentExecutionDb.workflowData.nodes); - const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials, undefined, executionTimeoutTimestamp); additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string }); let workflowExecute: WorkflowExecute; diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index ceee653a15..464d140757 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -11,11 +11,12 @@ import { ITaskData, IWorkflowBase as IWorkflowBaseWorkflow, IWorkflowCredentials, + Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; import { - IDeferredPromise, + IDeferredPromise, WorkflowExecute, } from 'n8n-core'; import * as PCancelable from 'p-cancelable'; @@ -410,3 +411,9 @@ export interface IWorkflowExecutionDataProcessWithExecution extends IWorkflowExe executionId: string; nodeTypeData: ITransferNodeTypes; } + +export interface IWorkflowExecuteProcess { + startedAt: Date, + workflow: Workflow; + workflowExecute: WorkflowExecute; +} diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 970835a90c..8e0793385a 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -8,6 +8,7 @@ import { IExecutionResponse, IPushDataExecutionFinished, IWorkflowBase, + IWorkflowExecuteProcess, IWorkflowExecutionDataProcess, NodeTypes, Push, @@ -569,7 +570,7 @@ export async function getWorkflowData(workflowInfo: IExecuteWorkflowInfo): Promi * @param {INodeExecutionData[]} [inputData] * @returns {(Promise>)} */ -export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[], parentExecutionId?: string, loadedWorkflowData?: IWorkflowBase, loadedRunData?: IWorkflowExecutionDataProcess): Promise | IRun> { +export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[], parentExecutionId?: string, loadedWorkflowData?: IWorkflowBase, loadedRunData?: IWorkflowExecutionDataProcess): Promise | IWorkflowExecuteProcess> { const externalHooks = ExternalHooks(); await externalHooks.init(); @@ -605,10 +606,19 @@ export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additi // This one already contains changes to talk to parent process // and get executionID from `activeExecutions` running on main process additionalDataIntegrated.executeWorkflow = additionalData.executeWorkflow; + additionalDataIntegrated.executionTimeoutTimestamp = additionalData.executionTimeoutTimestamp; // Execute the workflow const workflowExecute = new WorkflowExecute(additionalDataIntegrated, runData.executionMode, runExecutionData); + if (parentExecutionId !== undefined) { + // Must be changed to become typed + return { + startedAt: new Date(), + workflow, + workflowExecute, + }; + } const data = await workflowExecute.processRunExecutionData(workflow); await externalHooks.run('workflow.postExecute', [data, workflowData]); @@ -616,14 +626,9 @@ export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additi if (data.finished === true) { // Workflow did finish successfully - if (parentExecutionId !== undefined) { - return data; - } else { - await ActiveExecutions.getInstance().remove(executionId, data); - - const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); - return returnData!.data!.main; - } + await ActiveExecutions.getInstance().remove(executionId, data); + const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); + return returnData!.data!.main; } else { await ActiveExecutions.getInstance().remove(executionId, data); // Workflow did fail @@ -644,7 +649,7 @@ export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additi * @param {INodeParameters} currentNodeParameters * @returns {Promise} */ -export async function getBase(credentials: IWorkflowCredentials, currentNodeParameters?: INodeParameters): Promise { +export async function getBase(credentials: IWorkflowCredentials, currentNodeParameters?: INodeParameters, executionTimeoutTimestamp?: number): Promise { const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const timezone = config.get('generic.timezone') as string; @@ -666,6 +671,7 @@ export async function getBase(credentials: IWorkflowCredentials, currentNodePara webhookBaseUrl, webhookTestBaseUrl, currentNodeParameters, + executionTimeoutTimestamp, }; } diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 386f1c51f7..b86df1c122 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -158,8 +158,22 @@ export class WorkflowRunner { const nodeTypes = NodeTypes(); + + // Soft timeout to stop workflow execution after current running node + // Changes were made by adding the `workflowTimeout` to the `additionalData` + // So that the timeout will also work for executions with nested workflows. + let executionTimeout: NodeJS.Timeout; + let workflowTimeout = config.get('executions.timeout') as number; // initialize with default + if (data.workflowData.settings && data.workflowData.settings.executionTimeout) { + workflowTimeout = data.workflowData.settings!.executionTimeout as number; // preference on workflow setting + } + + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number); + } + const workflow = new Workflow({ id: data.workflowData.id as string | undefined, name: data.workflowData.name, nodes: data.workflowData!.nodes, connections: data.workflowData!.connections, active: data.workflowData!.active, nodeTypes, staticData: data.workflowData!.staticData }); - const additionalData = await WorkflowExecuteAdditionalData.getBase(data.credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(data.credentials, undefined, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000); // Register the active execution const executionId = await this.activeExecutions.add(data, undefined); @@ -184,12 +198,6 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); - // Soft timeout to stop workflow execution after current running node - let executionTimeout: NodeJS.Timeout; - let workflowTimeout = config.get('executions.timeout') as number > 0 && config.get('executions.timeout') as number; // initialize with default - if (data.workflowData.settings && data.workflowData.settings.executionTimeout) { - workflowTimeout = data.workflowData.settings!.executionTimeout as number > 0 && data.workflowData.settings!.executionTimeout as number; // preference on workflow setting - } if (workflowTimeout) { const timeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds @@ -280,7 +288,6 @@ export class WorkflowRunner { * the database. * *************************************************/ let watchDogInterval: NodeJS.Timeout | undefined; - let resolved = false; const watchDog = new Promise((res) => { watchDogInterval = setInterval(async () => { @@ -301,28 +308,9 @@ export class WorkflowRunner { } }; - await new Promise((res, rej) => { - jobData.then((data) => { - if (!resolved) { - resolved = true; - clearWatchdogInterval(); - res(data); - } - }).catch((e) => { - if(!resolved) { - resolved = true; - clearWatchdogInterval(); - rej(e); - } - }); - watchDog.then((data) => { - if (!resolved) { - resolved = true; - clearWatchdogInterval(); - res(data); - } - }); - }); + await Promise.race([jobData, watchDog]); + clearWatchdogInterval(); + } else { await jobData; } @@ -364,7 +352,7 @@ export class WorkflowRunner { // We don't want errors here to crash n8n. Just log and proceed. console.log('Error removing saved execution from database. More details: ', err); } - + resolve(runData); }); @@ -383,7 +371,7 @@ export class WorkflowRunner { * @memberof WorkflowRunner */ async runSubprocess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { - const startedAt = new Date(); + let startedAt = new Date(); const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); if (loadStaticData === true && data.workflowData.id) { @@ -426,7 +414,6 @@ export class WorkflowRunner { } } - (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; (data as unknown as IWorkflowExecutionDataProcessWithExecution).nodeTypeData = nodeTypeData; (data as unknown as IWorkflowExecutionDataProcessWithExecution).credentialsOverwrite = credentialsOverwrites; @@ -439,24 +426,40 @@ export class WorkflowRunner { // Start timeout for the execution let executionTimeout: NodeJS.Timeout; - let workflowTimeout = config.get('executions.timeout') as number > 0 && config.get('executions.timeout') as number; // initialize with default + let workflowTimeout = config.get('executions.timeout') as number; // initialize with default if (data.workflowData.settings && data.workflowData.settings.executionTimeout) { - workflowTimeout = data.workflowData.settings!.executionTimeout as number > 0 && data.workflowData.settings!.executionTimeout as number; // preference on workflow setting + workflowTimeout = data.workflowData.settings!.executionTimeout as number; // preference on workflow setting } - if (workflowTimeout) { - const timeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds - executionTimeout = setTimeout(() => { - this.activeExecutions.stopExecution(executionId, 'timeout'); - - executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds - }, timeout); + const processTimeoutFunction = (timeout: number) => { + this.activeExecutions.stopExecution(executionId, 'timeout'); + executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds } + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds + // Start timeout already now but give process at least 5 seconds to start. + // Without it could would it be possible that the workflow executions times out before it even got started if + // the timeout time is very short as the process start time can be quite long. + executionTimeout = setTimeout(processTimeoutFunction, Math.max(5000, workflowTimeout), workflowTimeout); + } + + // Create a list of child spawned executions + // If after the child process exits we have + // outstanding executions, we remove them + const childExecutionIds: string[] = []; // Listen to data from the subprocess subprocess.on('message', async (message: IProcessMessage) => { - if (message.type === 'end') { + if (message.type === 'start') { + // Now that the execution actually started set the timeout again so that does not time out to early. + startedAt = new Date(); + if (workflowTimeout > 0) { + clearTimeout(executionTimeout); + executionTimeout = setTimeout(processTimeoutFunction, workflowTimeout, workflowTimeout); + } + + } else if (message.type === 'end') { clearTimeout(executionTimeout); this.activeExecutions.remove(executionId!, message.data.runData); @@ -474,14 +477,20 @@ export class WorkflowRunner { this.processError(timeoutError, startedAt, data.executionMode, executionId); } else if (message.type === 'startExecution') { const executionId = await this.activeExecutions.add(message.data.runData); + childExecutionIds.push(executionId); subprocess.send({ type: 'executionId', data: {executionId} } as IProcessMessage); } else if (message.type === 'finishExecution') { + const executionIdIndex = childExecutionIds.indexOf(message.data.executionId); + if (executionIdIndex !== -1) { + childExecutionIds.splice(executionIdIndex, 1); + } + await this.activeExecutions.remove(message.data.executionId, message.data.result); } }); // Also get informed when the processes does exit especially when it did crash or timed out - subprocess.on('exit', (code, signal) => { + subprocess.on('exit', async (code, signal) => { if (signal === 'SIGTERM'){ // Execution timed out and its process has been terminated const timeoutError = new WorkflowOperationError('Workflow execution timed out!'); @@ -493,6 +502,17 @@ export class WorkflowRunner { this.processError(executionError, startedAt, data.executionMode, executionId); } + + for(const executionId of childExecutionIds) { + // When the child process exits, if we still have + // pending child executions, we mark them as finished + // They will display as unknown to the user + // Instead of pending forever as executing when it + // actually isn't anymore. + await this.activeExecutions.remove(executionId); + } + + clearTimeout(executionTimeout); }); diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index b263fa864f..f3c381fb7d 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -4,6 +4,7 @@ import { CredentialTypes, Db, ExternalHooks, + IWorkflowExecuteProcess, IWorkflowExecutionDataProcessWithExecution, NodeTypes, WorkflowExecuteAdditionalData, @@ -40,6 +41,9 @@ export class WorkflowRunnerProcess { workflow: Workflow | undefined; workflowExecute: WorkflowExecute | undefined; executionIdCallback: (executionId: string) => void | undefined; + childExecutions: { + [key: string]: IWorkflowExecuteProcess, + } = {}; static async stopProcess() { setTimeout(() => { @@ -107,8 +111,18 @@ export class WorkflowRunnerProcess { await Db.init(); } + // Start timeout for the execution + let workflowTimeout = config.get('executions.timeout') as number; // initialize with default + if (this.data.workflowData.settings && this.data.workflowData.settings.executionTimeout) { + workflowTimeout = this.data.workflowData.settings!.executionTimeout as number; // preference on workflow setting + } + + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number); + } + this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings }); - const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials, undefined, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000); additionalData.hooks = this.getProcessForwardHooks(); const executeWorkflowFunction = additionalData.executeWorkflow; @@ -123,14 +137,20 @@ export class WorkflowRunnerProcess { }); let result: IRun; try { - result = await executeWorkflowFunction(workflowInfo, additionalData, inputData, executionId, workflowData, runData); + const executeWorkflowFunctionOutput = await executeWorkflowFunction(workflowInfo, additionalData, inputData, executionId, workflowData, runData) as {workflowExecute: WorkflowExecute, workflow: Workflow} as IWorkflowExecuteProcess; + const workflowExecute = executeWorkflowFunctionOutput.workflowExecute; + this.childExecutions[executionId] = executeWorkflowFunctionOutput; + const workflow = executeWorkflowFunctionOutput.workflow; + result = await workflowExecute.processRunExecutionData(workflow) as IRun; + await externalHooks.run('workflow.postExecute', [result, workflowData]); + await sendToParentProcess('finishExecution', { executionId, result }); + delete this.childExecutions[executionId]; } catch (e) { await sendToParentProcess('finishExecution', { executionId }); - // Throw same error we had - throw e; + delete this.childExecutions[executionId]; + // Throw same error we had + throw e; } - - await sendToParentProcess('finishExecution', { executionId, result }); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result); return returnData!.data!.main; @@ -254,6 +274,8 @@ const workflowRunner = new WorkflowRunnerProcess(); process.on('message', async (message: IProcessMessage) => { try { if (message.type === 'startWorkflow') { + await sendToParentProcess('start', {}); + const runData = await workflowRunner.runWorkflow(message.data); await sendToParentProcess('end', { @@ -267,6 +289,18 @@ process.on('message', async (message: IProcessMessage) => { let runData: IRun; if (workflowRunner.workflowExecute !== undefined) { + + const executionIds = Object.keys(workflowRunner.childExecutions); + + for (const executionId of executionIds) { + const childWorkflowExecute = workflowRunner.childExecutions[executionId]; + runData = childWorkflowExecute.workflowExecute.getFullRunData(workflowRunner.childExecutions[executionId].startedAt); + const timeOutError = message.type === 'timeout' ? new WorkflowOperationError('Workflow execution timed out!') : undefined; + + // If there is any data send it to parent process, if execution timedout add the error + await childWorkflowExecute.workflowExecute.processSuccessExecution(workflowRunner.childExecutions[executionId].startedAt, childWorkflowExecute.workflow, timeOutError); + } + // Workflow started already executing runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 210711aee7..40fe227d33 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -550,6 +550,10 @@ export class WorkflowExecute { executionLoop: while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) { + if (this.additionalData.executionTimeoutTimestamp !== undefined && Date.now() >= this.additionalData.executionTimeoutTimestamp) { + gotCancel = true; + } + // @ts-ignore if (gotCancel === true) { return Promise.resolve(); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 800dbb35a4..ba78499b5a 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -748,6 +748,7 @@ export interface IWorkflowExecuteAdditionalData { webhookBaseUrl: string; webhookTestBaseUrl: string; currentNodeParameters?: INodeParameters; + executionTimeoutTimestamp?: number; } export type WorkflowExecuteMode = 'cli' | 'error' | 'integrated' | 'internal' | 'manual' | 'retry' | 'trigger' | 'webhook';