diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 70d8e117dc..90a0b39660 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -20,6 +20,7 @@ import { IGetExecuteTriggerFunctions, INode, INodeExecutionData, + IRun, IRunExecutionData, IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow, NodeHelpers, @@ -52,6 +53,9 @@ import config from '../config'; import { User } from './databases/entities/User'; import { whereClause } from './WorkflowHelpers'; import { WorkflowEntity } from './databases/entities/WorkflowEntity'; +import * as ActiveExecutions from './ActiveExecutions'; + +const activeExecutions = ActiveExecutions.getInstance(); const WEBHOOK_PROD_UNREGISTERED_HINT = `The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)`; @@ -675,14 +679,31 @@ export class ActiveWorkflowRunner { returnFunctions.emit = ( data: INodeExecutionData[][], responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, ): void => { // eslint-disable-next-line @typescript-eslint/restrict-template-expressions Logger.debug(`Received trigger for workflow "${workflow.name}"`); WorkflowHelpers.saveStaticData(workflow); // eslint-disable-next-line id-denylist - this.runWorkflow(workflowData, node, data, additionalData, mode, responsePromise).catch( - (error) => console.error(error), + const executePromise = this.runWorkflow( + workflowData, + node, + data, + additionalData, + mode, + responsePromise, ); + + if (donePromise) { + executePromise.then((executionId) => { + activeExecutions + .getPostExecutePromise(executionId) + .then(donePromise.resolve) + .catch(donePromise.reject); + }); + } else { + executePromise.catch(console.error); + } }; returnFunctions.emitError = async (error: Error): Promise => { await this.activeWorkflows?.remove(workflowData.id.toString()); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 1c799ac53f..9b36f1d517 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -627,6 +627,7 @@ export class WorkflowExecute { let currentExecutionTry = ''; let lastExecutionTry = ''; + let closeFunction: Promise | undefined; return new PCancelable(async (resolve, reject, onCancel) => { let gotCancel = false; @@ -811,7 +812,7 @@ export class WorkflowExecute { node: executionNode.name, workflowId: workflow.id, }); - nodeSuccessData = await workflow.runNode( + const runNodeData = await workflow.runNode( executionData.node, executionData.data, this.runExecutionData, @@ -820,6 +821,14 @@ export class WorkflowExecute { NodeExecuteFunctions, this.mode, ); + nodeSuccessData = runNodeData.data; + + if (runNodeData.closeFunction) { + // Explanation why we do this can be found in n8n-workflow/Workflow.ts -> runNode + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + closeFunction = runNodeData.closeFunction(); + } + Logger.debug(`Running node "${executionNode.name}" finished successfully`, { node: executionNode.name, workflowId: workflow.id, @@ -1033,9 +1042,10 @@ export class WorkflowExecute { startedAt, workflow, new WorkflowOperationError('Workflow has been canceled or timed out!'), + closeFunction, ); } - return this.processSuccessExecution(startedAt, workflow, executionError); + return this.processSuccessExecution(startedAt, workflow, executionError, closeFunction); }) .catch(async (error) => { const fullRunData = this.getFullRunData(startedAt); @@ -1061,6 +1071,20 @@ export class WorkflowExecute { }, ); + if (closeFunction) { + try { + await closeFunction; + } catch (errorClose) { + Logger.error( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/restrict-template-expressions + `There was a problem deactivating trigger of workflow "${workflow.id}": "${errorClose.message}"`, + { + workflowId: workflow.id, + }, + ); + } + } + return fullRunData; }); @@ -1072,6 +1096,7 @@ export class WorkflowExecute { startedAt: Date, workflow: Workflow, executionError?: ExecutionError, + closeFunction?: Promise, ): Promise { const fullRunData = this.getFullRunData(startedAt); @@ -1106,6 +1131,20 @@ export class WorkflowExecute { await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + if (closeFunction) { + try { + await closeFunction; + } catch (error) { + Logger.error( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `There was a problem deactivating trigger of workflow "${workflow.id}": "${error.message}"`, + { + workflowId: workflow.id, + }, + ); + } + } + return fullRunData; } diff --git a/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts b/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts index ffe9bc4d07..1c126e6c2d 100644 --- a/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts +++ b/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts @@ -68,7 +68,7 @@ export const rabbitDefaultOptions: Array { // tslint:disable-line:no-any +declare module 'amqplib' { + interface Channel { + connection: amqplib.Connection; + } +} + +export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise { const credentials = await this.getCredentials('rabbitmq'); const credentialKeys = [ @@ -44,7 +50,7 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction reject(error); }); - const channel = await connection.createChannel().catch(console.warn); + const channel = await connection.createChannel().catch(console.warn) as amqplib.Channel; if (options.arguments && ((options.arguments as IDataObject).argument! as IDataObject[]).length) { const additionalArguments: IDataObject = {}; @@ -54,7 +60,6 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction options.arguments = additionalArguments; } - resolve(channel); } catch (error) { reject(error); @@ -62,7 +67,7 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction }); } -export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise { // tslint:disable-line:no-any +export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise { const channel = await rabbitmqConnect.call(this, options); return new Promise(async (resolve, reject) => { @@ -75,7 +80,7 @@ export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFun }); } -export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise { // tslint:disable-line:no-any +export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise { const channel = await rabbitmqConnect.call(this, options); return new Promise(async (resolve, reject) => { @@ -87,3 +92,53 @@ export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITrigger } }); } + +export class MessageTracker { + messages: number[] = []; + isClosing = false; + + received(message: amqplib.ConsumeMessage) { + this.messages.push(message.fields.deliveryTag); + } + + answered(message: amqplib.ConsumeMessage) { + if (this.messages.length === 0) { + return; + } + + const index = this.messages.findIndex(value => value !== message.fields.deliveryTag); + this.messages.splice(index); + } + + unansweredMessages() { + return this.messages.length; + } + + async closeChannel(channel: amqplib.Channel, consumerTag: string) { + if (this.isClosing) { + return; + } + this.isClosing = true; + + // Do not accept any new messages + await channel.cancel(consumerTag); + + let count = 0; + let unansweredMessages = this.unansweredMessages(); + + // Give currently executing messages max. 5 minutes to finish before + // the channel gets closed. If we would not do that, it would not be possible + // to acknowledge messages anymore for which the executions were already running + // when for example a new version of the workflow got saved. That would lead to + // them getting delivered and processed again. + while (unansweredMessages !== 0 && count++ <= 300) { + await new Promise((resolve) => { + setTimeout(resolve, 1000); + }); + unansweredMessages = this.unansweredMessages(); + } + + await channel.close(); + await channel.connection.close(); + } +} diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts index ca12dc4012..2491592fa6 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -225,7 +225,7 @@ export class RabbitMQ implements INodeType { ], }, { - displayName: 'Auto Delete', + displayName: 'Auto Delete Queue', name: 'autoDelete', type: 'boolean', default: false, diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index 1074830e08..4912c4b994 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -1,11 +1,14 @@ import { + createDeferredPromise, IDataObject, INodeExecutionData, INodeProperties, INodeType, INodeTypeDescription, + IRun, ITriggerFunctions, ITriggerResponse, + LoggerProxy as Logger, } from 'n8n-workflow'; import { @@ -13,9 +16,12 @@ import { } from './DefaultOptions'; import { + MessageTracker, rabbitmqConnectQueue, } from './GenericFunctions'; +import * as amqplib from 'amqplib'; + export class RabbitMQTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'RabbitMQ Trigger', @@ -42,7 +48,7 @@ export class RabbitMQTrigger implements INodeType { type: 'string', default: '', placeholder: 'queue-name', - description: 'Name of the queue to publish to', + description: 'The name of the queue to read from', }, { @@ -59,6 +65,30 @@ export class RabbitMQTrigger implements INodeType { default: false, description: 'Saves the content as binary', }, + { + displayName: 'Delete from queue when', + name: 'acknowledge', + type: 'options', + options: [ + { + name: 'Execution finishes', + value: 'executionFinishes', + description: 'After the workflow execution finished. No matter if the execution was successful or not.', + }, + { + name: 'Execution finishes successfully', + value: 'executionFinishesSuccessfully', + description: 'After the workflow execution finished successfully', + }, + { + name: 'Immediately', + value: 'immediately', + description: 'As soon as the message got received', + }, + ], + default: 'immediately', + description: 'When to acknowledge the message', + }, { displayName: 'JSON Parse Body', name: 'jsonParseBody', @@ -87,6 +117,21 @@ export class RabbitMQTrigger implements INodeType { default: false, description: 'Returns only the content property', }, + // eslint-disable-next-line n8n-nodes-base/node-param-default-missing + { + displayName: 'Parallel message processing limit', + name: 'parallelMessages', + type: 'number', + default: -1, + displayOptions: { + hide: { + acknowledge: [ + 'immediately', + ], + }, + }, + description: 'Max number of executions at a time. Use -1 for no limit.', + }, ...rabbitDefaultOptions, ].sort((a, b) => { if ((a as INodeProperties).displayName.toLowerCase() < (b as INodeProperties).displayName.toLowerCase()) { return -1; } @@ -106,42 +151,117 @@ export class RabbitMQTrigger implements INodeType { const self = this; + let parallelMessages = (options.parallelMessages !== undefined && options.parallelMessages !== -1) ? parseInt(options.parallelMessages as string, 10) : -1; + + if (parallelMessages === 0 || parallelMessages < -1) { + throw new Error('Parallel message processing limit must be greater than zero (or -1 for no limit)'); + } + + if (this.getMode() === 'manual') { + // Do only catch a single message when executing manually, else messages will leak + parallelMessages = 1; + } + + let acknowledgeMode = options.acknowledge ? options.acknowledge : 'immediately'; + + if (parallelMessages !== -1 && acknowledgeMode === 'immediately') { + // If parallel message limit is set, then the default mode is "executionFinishes" + // unless acknowledgeMode got set specifically. Be aware that the mode "immediately" + // can not be supported in this case. + acknowledgeMode = 'executionFinishes'; + } + + const messageTracker = new MessageTracker(); + let consumerTag: string; + const startConsumer = async () => { - await channel.consume(queue, async (message: IDataObject) => { + if (parallelMessages !== -1) { + channel.prefetch(parallelMessages); + } + + const consumerInfo = await channel.consume(queue, async (message) => { if (message !== null) { - let content: IDataObject | string = message!.content!.toString(); - const item: INodeExecutionData = { - json: {}, - }; + try { + if (acknowledgeMode !== 'immediately') { + messageTracker.received(message); + } - if (options.contentIsBinary === true) { - item.binary = { - data: await this.helpers.prepareBinaryData(message.content), + let content: IDataObject | string = message!.content!.toString(); + + const item: INodeExecutionData = { + json: {}, }; - item.json = message; - message.content = undefined; - } else { - if (options.jsonParseBody === true) { - content = JSON.parse(content as string); - } - if (options.onlyContent === true) { - item.json = content as IDataObject; - } else { - message.content = content; - item.json = message; - } - } + if (options.contentIsBinary === true) { + item.binary = { + data: await this.helpers.prepareBinaryData(message.content), + }; - self.emit([ - [ - item, - ], - ]); - channel.ack(message); + item.json = message as unknown as IDataObject; + message.content = undefined as unknown as Buffer; + } else { + if (options.jsonParseBody === true) { + content = JSON.parse(content as string); + } + if (options.onlyContent === true) { + item.json = content as IDataObject; + } else { + message.content = content as unknown as Buffer; + item.json = message as unknown as IDataObject; + } + } + + let responsePromise = undefined; + if (acknowledgeMode !== 'immediately') { + responsePromise = await createDeferredPromise(); + } + + self.emit([ + [ + item, + ], + ], undefined, responsePromise); + + if (responsePromise) { + // Acknowledge message after the execution finished + await responsePromise + .promise() + .then(async (data: IRun) => { + if (data.data.resultData.error) { + // The execution did fail + if (acknowledgeMode === 'executionFinishesSuccessfully') { + channel.nack(message); + messageTracker.answered(message); + return; + } + } + + channel.ack(message); + messageTracker.answered(message); + }); + } else { + // Acknowledge message directly + channel.ack(message); + } + + } catch (error) { + const workflow = this.getWorkflow(); + const node = this.getNode(); + if (acknowledgeMode !== 'immediately') { + messageTracker.answered(message); + } + + Logger.error(`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`, + { + node: node.name, + workflowId: workflow.id, + }, + ); + } } }); + consumerTag = consumerInfo.consumerTag; }; startConsumer(); @@ -149,23 +269,23 @@ export class RabbitMQTrigger implements INodeType { // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. async function closeFunction() { - await channel.close(); - await channel.connection.close(); - } - // The "manualTriggerFunction" function gets called by n8n - // when a user is in the workflow editor and starts the - // workflow manually. So the function has to make sure that - // the emit() gets called with similar data like when it - // would trigger by itself so that the user knows what data - // to expect. - async function manualTriggerFunction() { - startConsumer(); + try { + return messageTracker.closeChannel(channel, consumerTag); + } catch(error) { + const workflow = self.getWorkflow(); + const node = self.getNode(); + Logger.error(`There was a problem closing the RabbitMQ Trigger node connection "${node.name}" in workflow "${workflow.id}": "${error.message}"`, + { + node: node.name, + workflowId: workflow.id, + }, + ); + } } return { closeFunction, - manualTriggerFunction, }; } diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index cb1810385a..cb9ae25f7c 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -685,6 +685,7 @@ ] }, "devDependencies": { + "@types/amqplib": "^0.8.2", "@types/aws4": "^1.5.1", "@types/basic-auth": "^1.1.2", "@types/cheerio": "^0.22.15", diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 92a8e7b115..c3a0513b99 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -350,6 +350,10 @@ export interface IGetExecuteTriggerFunctions { ): ITriggerFunctions; } +export interface IRunNodeResponse { + data: INodeExecutionData[][] | null | undefined; + closeFunction?: () => Promise; +} export interface IGetExecuteFunctions { ( workflow: Workflow, @@ -690,6 +694,7 @@ export interface ITriggerFunctions { emit( data: INodeExecutionData[][], responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, ): void; emitError(error: Error, responsePromise?: IDeferredPromise): void; getCredentials(type: string): Promise; diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index b772c20c6d..1f0ce3e11d 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -46,7 +46,14 @@ import { WorkflowExecuteMode, } from '.'; -import { IConnection, IDataObject, IConnectedNode, IObservableObject } from './Interfaces'; +import { + IConnection, + IDataObject, + IConnectedNode, + IObservableObject, + IRun, + IRunNodeResponse, +} from './Interfaces'; function dedupe(arr: T[]): T[] { return [...new Set(arr)]; @@ -1040,6 +1047,7 @@ export class Workflow { ( data: INodeExecutionData[][], responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, ) => { additionalData.hooks!.hookFunctions.sendResponse = [ async (response: IExecuteResponsePromiseData): Promise => { @@ -1049,6 +1057,14 @@ export class Workflow { }, ]; + if (donePromise) { + additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift( + async (runData: IRun): Promise => { + return donePromise.resolve(runData); + }, + ); + } + resolveEmit(data); } )(resolve); @@ -1159,18 +1175,18 @@ export class Workflow { additionalData: IWorkflowExecuteAdditionalData, nodeExecuteFunctions: INodeExecuteFunctions, mode: WorkflowExecuteMode, - ): Promise { + ): Promise { if (node.disabled === true) { // If node is disabled simply pass the data through // return NodeRunHelpers. if (inputData.hasOwnProperty('main') && inputData.main.length > 0) { // If the node is disabled simply return the data from the first main input if (inputData.main[0] === null) { - return undefined; + return { data: undefined }; } - return [inputData.main[0]]; + return { data: [inputData.main[0]] }; } - return undefined; + return { data: undefined }; } const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); @@ -1195,7 +1211,7 @@ export class Workflow { if (connectionInputData.length === 0) { // No data for node so return - return undefined; + return { data: undefined }; } } @@ -1245,7 +1261,7 @@ export class Workflow { } if (returnPromises.length === 0) { - return null; + return { data: null }; } let promiseResults; @@ -1256,7 +1272,7 @@ export class Workflow { } if (promiseResults) { - return [promiseResults]; + return { data: [promiseResults] }; } } else if (nodeType.execute) { const thisArgs = nodeExecuteFunctions.getExecuteFunctions( @@ -1269,7 +1285,7 @@ export class Workflow { additionalData, mode, ); - return nodeType.execute.call(thisArgs); + return { data: await nodeType.execute.call(thisArgs) }; } else if (nodeType.poll) { if (mode === 'manual') { // In manual mode run the poll function @@ -1280,10 +1296,10 @@ export class Workflow { mode, 'manual', ); - return nodeType.poll.call(thisArgs); + return { data: await nodeType.poll.call(thisArgs) }; } // In any other mode pass data through as it already contains the result of the poll - return inputData.main as INodeExecutionData[][]; + return { data: inputData.main as INodeExecutionData[][] }; } else if (nodeType.trigger) { if (mode === 'manual') { // In manual mode start the trigger @@ -1296,7 +1312,7 @@ export class Workflow { ); if (triggerResponse === undefined) { - return null; + return { data: null }; } if (triggerResponse.manualTriggerFunction !== undefined) { @@ -1306,22 +1322,27 @@ export class Workflow { const response = await triggerResponse.manualTriggerResponse!; - // And then close it again after it did execute + let closeFunction; if (triggerResponse.closeFunction) { - await triggerResponse.closeFunction(); + // In manual mode we return the trigger closeFunction. That allows it to be called directly + // but we do not have to wait for it to finish. That is important for things like queue-nodes. + // There the full close will may be delayed till a message gets acknowledged after the execution. + // If we would not be able to wait for it to close would it cause problems with "own" mode as the + // process would be killed directly after it and so the acknowledge would not have been finished yet. + closeFunction = triggerResponse.closeFunction; } if (response.length === 0) { - return null; + return { data: null, closeFunction }; } - return response; + return { data: response, closeFunction }; } // For trigger nodes in any mode except "manual" do we simply pass the data through - return inputData.main as INodeExecutionData[][]; + return { data: inputData.main as INodeExecutionData[][] }; } else if (nodeType.webhook) { // For webhook nodes always simply pass the data through - return inputData.main as INodeExecutionData[][]; + return { data: inputData.main as INodeExecutionData[][] }; } else { // For nodes which have routing information on properties @@ -1334,9 +1355,11 @@ export class Workflow { mode, ); - return routingNode.runNode(inputData, runIndex, nodeType, nodeExecuteFunctions); + return { + data: await routingNode.runNode(inputData, runIndex, nodeType, nodeExecuteFunctions), + }; } - return null; + return { data: null }; } }