diff --git a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts index 83999ca86d..86832565a4 100644 --- a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts +++ b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts @@ -1,8 +1,11 @@ import * as amqplib from 'amqplib'; import type { + IDeferredPromise, + IExecuteResponsePromiseData, IDataObject, IExecuteFunctions, INodeExecutionData, + IRun, ITriggerFunctions, } from 'n8n-workflow'; import { jsonParse, sleep } from 'n8n-workflow'; @@ -114,11 +117,11 @@ export class MessageTracker { isClosing = false; - received(message: amqplib.ConsumeMessage) { + received(message: amqplib.Message) { this.messages.push(message.fields.deliveryTag); } - answered(message: amqplib.ConsumeMessage) { + answered(message: amqplib.Message) { if (this.messages.length === 0) { return; } @@ -131,14 +134,16 @@ export class MessageTracker { return this.messages.length; } - async closeChannel(channel: amqplib.Channel, consumerTag: string) { + async closeChannel(channel: amqplib.Channel, consumerTag?: string) { if (this.isClosing) { return; } this.isClosing = true; // Do not accept any new messages - await channel.cancel(consumerTag); + if (consumerTag) { + await channel.cancel(consumerTag); + } let count = 0; let unansweredMessages = this.unansweredMessages(); @@ -195,3 +200,70 @@ export const parseMessage = async ( } } }; + +export async function handleMessage( + this: ITriggerFunctions, + message: amqplib.Message, + channel: amqplib.Channel, + messageTracker: MessageTracker, + acknowledgeMode: string, + options: TriggerOptions, +) { + try { + if (acknowledgeMode !== 'immediately') { + messageTracker.received(message); + } + + const item = await parseMessage(message, options, this.helpers); + + let responsePromise: IDeferredPromise | undefined = undefined; + let responsePromiseHook: IDeferredPromise | undefined = undefined; + if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { + responsePromise = this.helpers.createDeferredPromise(); + } else if (acknowledgeMode === 'laterMessageNode') { + responsePromiseHook = this.helpers.createDeferredPromise(); + } + if (responsePromiseHook) { + this.emit([[item]], responsePromiseHook, undefined); + } else { + this.emit([[item]], undefined, responsePromise); + } + if (responsePromise && acknowledgeMode !== 'laterMessageNode') { + // Acknowledge message after the execution finished + await responsePromise.promise.then(async (data: IRun) => { + if (data.data.resultData.error) { + // The execution did fail + if (acknowledgeMode === 'executionFinishesSuccessfully') { + channel.nack(message); + messageTracker.answered(message); + return; + } + } + channel.ack(message); + messageTracker.answered(message); + }); + } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { + await responsePromiseHook.promise.then(() => { + channel.ack(message); + messageTracker.answered(message); + }); + } else { + // Acknowledge message directly + channel.ack(message); + } + } catch (error) { + const workflow = this.getWorkflow(); + const node = this.getNode(); + if (acknowledgeMode !== 'immediately') { + messageTracker.answered(message); + } + + this.logger.error( + `There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`, + { + node: node.name, + workflowId: workflow.id, + }, + ); + } +} diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index f5422a7da9..dfa273197a 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -1,19 +1,16 @@ /* eslint-disable n8n-nodes-base/node-filename-against-convention */ import type { Message } from 'amqplib'; import type { - IDeferredPromise, - IExecuteResponsePromiseData, INodeProperties, INodeType, INodeTypeDescription, - IRun, ITriggerFunctions, ITriggerResponse, } from 'n8n-workflow'; import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import { rabbitDefaultOptions } from './DefaultOptions'; -import { MessageTracker, rabbitmqConnectQueue, parseMessage } from './GenericFunctions'; +import { MessageTracker, rabbitmqConnectQueue, handleMessage } from './GenericFunctions'; import type { TriggerOptions } from './types'; export class RabbitMQTrigger implements INodeType { @@ -207,131 +204,10 @@ export class RabbitMQTrigger implements INodeType { const options = this.getNodeParameter('options', {}) as TriggerOptions; const channel = await rabbitmqConnectQueue.call(this, queue, options); - if (this.getMode() === 'manual') { - const manualTriggerFunction = async () => { - // Do only catch a single message when executing manually, else messages will leak - await channel.prefetch(1); - - const processMessage = async (message: Message | null) => { - if (message !== null) { - const item = await parseMessage(message, options, this.helpers); - channel.ack(message); - this.emit([[item]]); - } else { - this.emitError(new Error('Connection got closed unexpectedly')); - } - }; - - const existingMessage = await channel.get(queue); - if (existingMessage) await processMessage(existingMessage); - else await channel.consume(queue, processMessage); - }; - - const closeFunction = async () => { - await channel.close(); - await channel.connection.close(); - return; - }; - - return { - closeFunction, - manualTriggerFunction, - }; - } - - const parallelMessages = options.parallelMessages ?? -1; - if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) { - throw new NodeOperationError( - this.getNode(), - 'Parallel message processing limit must be a number greater than zero (or -1 for no limit)', - ); - } - - let acknowledgeMode = options.acknowledge ?? 'immediately'; - - if (parallelMessages !== -1 && acknowledgeMode === 'immediately') { - // If parallel message limit is set, then the default mode is "executionFinishes" - // unless acknowledgeMode got set specifically. Be aware that the mode "immediately" - // can not be supported in this case. - acknowledgeMode = 'executionFinishes'; - } - const messageTracker = new MessageTracker(); + let acknowledgeMode = options.acknowledge ?? 'immediately'; let closeGotCalled = false; - - if (parallelMessages !== -1) { - await channel.prefetch(parallelMessages); - } - - channel.on('close', () => { - if (!closeGotCalled) { - this.emitError(new Error('Connection got closed unexpectedly')); - } - }); - - const consumerInfo = await channel.consume(queue, async (message) => { - if (message !== null) { - try { - if (acknowledgeMode !== 'immediately') { - messageTracker.received(message); - } - - const item = await parseMessage(message, options, this.helpers); - - let responsePromise: IDeferredPromise | undefined = undefined; - let responsePromiseHook: IDeferredPromise | undefined = - undefined; - if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { - responsePromise = this.helpers.createDeferredPromise(); - } else if (acknowledgeMode === 'laterMessageNode') { - responsePromiseHook = this.helpers.createDeferredPromise(); - } - if (responsePromiseHook) { - this.emit([[item]], responsePromiseHook, undefined); - } else { - this.emit([[item]], undefined, responsePromise); - } - if (responsePromise && acknowledgeMode !== 'laterMessageNode') { - // Acknowledge message after the execution finished - await responsePromise.promise.then(async (data: IRun) => { - if (data.data.resultData.error) { - // The execution did fail - if (acknowledgeMode === 'executionFinishesSuccessfully') { - channel.nack(message); - messageTracker.answered(message); - return; - } - } - channel.ack(message); - messageTracker.answered(message); - }); - } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { - await responsePromiseHook.promise.then(() => { - channel.ack(message); - messageTracker.answered(message); - }); - } else { - // Acknowledge message directly - channel.ack(message); - } - } catch (error) { - const workflow = this.getWorkflow(); - const node = this.getNode(); - if (acknowledgeMode !== 'immediately') { - messageTracker.answered(message); - } - - this.logger.error( - `There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`, - { - node: node.name, - workflowId: workflow.id, - }, - ); - } - } - }); - const consumerTag = consumerInfo.consumerTag; + let consumerTag: string | undefined; // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. @@ -352,6 +228,73 @@ export class RabbitMQTrigger implements INodeType { } }; + if (this.getMode() === 'manual') { + const manualTriggerFunction = async () => { + // Do only catch a single message when executing manually, else messages will leak + await channel.prefetch(1); + + const processMessage = async (message: Message | null) => { + if (message !== null) { + void handleMessage.call( + this, + message, + channel, + messageTracker, + acknowledgeMode, + options, + ); + } else { + this.emitError(new Error('Connection got closed unexpectedly')); + } + }; + + const existingMessage = await channel.get(queue); + if (existingMessage) { + await processMessage(existingMessage); + } else { + const consumerInfo = await channel.consume(queue, processMessage); + consumerTag = consumerInfo.consumerTag; + } + }; + + return { + closeFunction, + manualTriggerFunction, + }; + } + + const parallelMessages = options.parallelMessages ?? -1; + if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) { + throw new NodeOperationError( + this.getNode(), + 'Parallel message processing limit must be a number greater than zero (or -1 for no limit)', + ); + } + + if (parallelMessages !== -1 && acknowledgeMode === 'immediately') { + // If parallel message limit is set, then the default mode is "executionFinishes" + // unless acknowledgeMode got set specifically. Be aware that the mode "immediately" + // can not be supported in this case. + acknowledgeMode = 'executionFinishes'; + } + + if (parallelMessages !== -1) { + await channel.prefetch(parallelMessages); + } + + channel.on('close', () => { + if (!closeGotCalled) { + this.emitError(new Error('Connection got closed unexpectedly')); + } + }); + + const consumerInfo = await channel.consume(queue, async (message) => { + if (message !== null) { + void handleMessage.call(this, message, channel, messageTracker, acknowledgeMode, options); + } + }); + consumerTag = consumerInfo.consumerTag; + return { closeFunction, }; diff --git a/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts b/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts index 62fb7d8f6c..f649360d8e 100644 --- a/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts +++ b/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts @@ -1,6 +1,6 @@ import type { Channel, Connection, ConsumeMessage, Message } from 'amqplib'; -import { mock } from 'jest-mock-extended'; -import type { ITriggerFunctions } from 'n8n-workflow'; +import { mock, mockDeep } from 'jest-mock-extended'; +import type { INode, IRun, ITriggerFunctions, IWorkflowMetadata } from 'n8n-workflow'; const mockChannel = mock(); const mockConnection = mock({ createChannel: async () => mockChannel }); @@ -15,6 +15,7 @@ import { rabbitmqConnectQueue, rabbitmqCreateChannel, MessageTracker, + handleMessage, } from '../GenericFunctions'; import type { TriggerOptions } from '../types'; @@ -26,7 +27,7 @@ describe('RabbitMQ GenericFunctions', () => { password: 'pass', vhost: '/', }; - const context = mock(); + const context = mockDeep(); beforeEach(() => jest.clearAllMocks()); @@ -189,4 +190,248 @@ describe('RabbitMQ GenericFunctions', () => { expect(mockConnection.close).toHaveBeenCalled(); }); }); + + describe('handleMessage', () => { + const mockChannel = mockDeep(); + const messageTracker = mock(); + const message = { + content: { + foo: 'bar', + }, + } as unknown as Message; + const item = { json: message }; + const options = {} as TriggerOptions; + + it('should ack a message with "acknowledgeMode" set to "immediately"', async () => { + await handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'immediately', + options, + ); + + expect(context.emit).toHaveBeenCalledWith([[item]], undefined, undefined); + expect(mockChannel.ack).toHaveBeenCalledWith(message); + }); + + it('should ack a message with "acknowledgeMode" set to "executionFinishesSuccessfully"', async () => { + let resolvePromise: (data: IRun) => void = () => {}; + const deferredPromise = { + promise: new Promise((resolve) => { + resolvePromise = resolve; + }), + resolve: jest.fn(), + reject: jest.fn(), + }; + context.helpers.createDeferredPromise.mockReturnValue(deferredPromise); + + const handleMessagePromise = handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'executionFinishesSuccessfully', + options, + ); + + await Promise.resolve(); // yield control to let handleMessage run + + expect(messageTracker.received).toHaveBeenCalledWith(message); + expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise); + expect(mockChannel.ack).not.toHaveBeenCalled(); + expect(messageTracker.answered).not.toHaveBeenCalled(); + + resolvePromise({ + data: { + resultData: { + error: undefined, + }, + }, + } as IRun); + await handleMessagePromise; + + expect(mockChannel.ack).toHaveBeenCalledWith(message); + expect(messageTracker.answered).toHaveBeenCalledWith(message); + }); + + it('should nack a message with "acknowledgeMode" set to "executionFinishesSuccessfully" when there is an error', async () => { + let resolvePromise: (data: IRun) => void = () => {}; + const deferredPromise = { + promise: new Promise((resolve) => { + resolvePromise = resolve; + }), + resolve: jest.fn(), + reject: jest.fn(), + }; + context.helpers.createDeferredPromise.mockReturnValue(deferredPromise); + + const handleMessagePromise = handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'executionFinishesSuccessfully', + options, + ); + + await Promise.resolve(); // yield control to let handleMessage run + + expect(messageTracker.received).toHaveBeenCalledWith(message); + expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise); + expect(mockChannel.nack).not.toHaveBeenCalled(); + expect(messageTracker.answered).not.toHaveBeenCalled(); + + resolvePromise({ + data: { + resultData: { + error: new Error('Some error'), + }, + }, + } as IRun); + await handleMessagePromise; + + expect(mockChannel.nack).toHaveBeenCalledWith(message); + expect(messageTracker.answered).toHaveBeenCalledWith(message); + }); + + it('should ack a message with "acknowledgeMode" set to "executionFinishes"', async () => { + let resolvePromise: (data: IRun) => void = () => {}; + const deferredPromise = { + promise: new Promise((resolve) => { + resolvePromise = resolve; + }), + resolve: jest.fn(), + reject: jest.fn(), + }; + context.helpers.createDeferredPromise.mockReturnValue(deferredPromise); + + const handleMessagePromise = handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'executionFinishes', + options, + ); + + await Promise.resolve(); // yield control to let handleMessage run + + expect(messageTracker.received).toHaveBeenCalledWith(message); + expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise); + expect(mockChannel.ack).not.toHaveBeenCalled(); + expect(messageTracker.answered).not.toHaveBeenCalled(); + + resolvePromise({ + data: { + resultData: { + error: undefined, + }, + }, + } as IRun); + await handleMessagePromise; + + expect(mockChannel.ack).toHaveBeenCalledWith(message); + expect(messageTracker.answered).toHaveBeenCalledWith(message); + }); + + it('should ack a message with "acknowledgeMode" set to "laterMessageNode"', async () => { + let resolvePromise: (data: IRun) => void = () => {}; + const deferredPromise = { + promise: new Promise((resolve) => { + resolvePromise = resolve; + }), + resolve: jest.fn(), + reject: jest.fn(), + }; + context.helpers.createDeferredPromise.mockReturnValue(deferredPromise); + + const handleMessagePromise = handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'laterMessageNode', + options, + ); + + await Promise.resolve(); // yield control to let handleMessage run + + expect(messageTracker.received).toHaveBeenCalledWith(message); + expect(context.emit).toHaveBeenCalledWith([[item]], deferredPromise, undefined); + expect(mockChannel.ack).not.toHaveBeenCalled(); + expect(messageTracker.answered).not.toHaveBeenCalled(); + + resolvePromise({ + data: { + resultData: { + error: undefined, + }, + }, + } as IRun); + await handleMessagePromise; + + expect(mockChannel.ack).toHaveBeenCalledWith(message); + expect(messageTracker.answered).toHaveBeenCalledWith(message); + }); + + it('should handle error when "acknowledgeMode" is set to "immediately"', async () => { + mockChannel.ack.mockImplementation(() => { + throw new Error('Test error'); + }); + context.getWorkflow.mockReturnValue({ + id: '123', + } as IWorkflowMetadata); + context.getNode.mockReturnValue({ + name: 'Test node', + } as INode); + + await handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'immediately', + options, + ); + + expect(context.logger.error).toHaveBeenCalledWith( + 'There was a problem with the RabbitMQ Trigger node "Test node" in workflow "123": "Test error"', + { + node: 'Test node', + workflowId: '123', + }, + ); + }); + + it('should handle error when "acknowledgeMode" is set to something other than "immediately"', async () => { + context.helpers.createDeferredPromise.mockImplementation(() => { + throw new Error('Test error'); + }); + context.getWorkflow.mockReturnValue({ + id: '123', + } as IWorkflowMetadata); + context.getNode.mockReturnValue({ + name: 'Test node', + } as INode); + + await handleMessage.call( + context, + message, + mockChannel, + messageTracker, + 'executionFinishesSuccessfully', + options, + ); + + expect(context.logger.error).toHaveBeenCalledWith( + 'There was a problem with the RabbitMQ Trigger node "Test node" in workflow "123": "Test error"', + { + node: 'Test node', + workflowId: '123', + }, + ); + }); + }); }); diff --git a/packages/nodes-base/nodes/RabbitMQ/test/RabbitMQTrigger.node.test.ts b/packages/nodes-base/nodes/RabbitMQ/test/RabbitMQTrigger.node.test.ts new file mode 100644 index 0000000000..4abb8b9bfb --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/test/RabbitMQTrigger.node.test.ts @@ -0,0 +1,118 @@ +import { mockDeep } from 'jest-mock-extended'; +import type { ITriggerFunctions } from 'n8n-workflow'; +import * as GenericFunctions from '../GenericFunctions'; +import type { Channel, GetMessage } from 'amqplib'; +import { RabbitMQTrigger } from '../RabbitMQTrigger.node'; + +describe('RabbitMQTrigger node', () => { + const trigger = new RabbitMQTrigger(); + const mockTriggerFunctions = mockDeep(); + const connectSpy = jest.spyOn(GenericFunctions, 'rabbitmqConnectQueue'); + const handleMessageSpy = jest.spyOn(GenericFunctions, 'handleMessage'); + const mockChannel = mockDeep(); + + beforeEach(() => { + jest.resetAllMocks(); + }); + + describe('manual execution', () => { + it('should get a message from the queue', async () => { + const message = { + content: { + foo: 'bar', + }, + fields: { + deliveryTag: 1, + }, + }; + const options = { acknowledge: 'immediately' }; + mockTriggerFunctions.getMode.mockReturnValue('manual'); + mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => { + switch (parameterName) { + case 'queue': + return 'testQueue'; + case 'options': + return options; + } + return undefined; + }); + connectSpy.mockResolvedValue(mockChannel); + mockChannel.get.mockResolvedValue(message as unknown as GetMessage); + + const { closeFunction, manualTriggerFunction } = + await trigger.trigger.call(mockTriggerFunctions); + await manualTriggerFunction!(); + + expect(mockChannel.prefetch).toHaveBeenCalledWith(1); + expect(mockChannel.get).toHaveBeenCalledWith('testQueue'); + expect(handleMessageSpy).toHaveBeenCalledWith( + message, + mockChannel, + expect.anything(), + 'immediately', + options, + ); + expect(mockChannel.consume).not.toHaveBeenCalled(); + expect(mockChannel.close).not.toHaveBeenCalled(); + await closeFunction!(); + expect(mockChannel.close).toHaveBeenCalled(); + }); + + it('should listen for a message from the queue', async () => { + const options = { acknowledge: 'immediately' }; + mockTriggerFunctions.getMode.mockReturnValue('manual'); + mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => { + switch (parameterName) { + case 'queue': + return 'testQueue'; + case 'options': + return options; + } + return undefined; + }); + connectSpy.mockResolvedValue(mockChannel); + mockChannel.consume.mockResolvedValue({ + consumerTag: 'testConsumerTag', + }); + + const { closeFunction, manualTriggerFunction } = + await trigger.trigger.call(mockTriggerFunctions); + await manualTriggerFunction!(); + + expect(mockChannel.prefetch).toHaveBeenCalledWith(1); + expect(mockChannel.consume).toHaveBeenCalledWith('testQueue', expect.anything()); + expect(mockChannel.close).not.toHaveBeenCalled(); + await closeFunction!(); + expect(mockChannel.close).toHaveBeenCalled(); + }); + }); + + describe('regular execution', () => { + it('should listen for a message from the queue', async () => { + const options = { acknowledge: 'immediately' }; + mockTriggerFunctions.getMode.mockReturnValue('trigger'); + mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => { + switch (parameterName) { + case 'queue': + return 'testQueue'; + case 'options': + return options; + } + return undefined; + }); + connectSpy.mockResolvedValue(mockChannel); + mockChannel.consume.mockResolvedValue({ + consumerTag: 'testConsumerTag', + }); + + const { closeFunction } = await trigger.trigger.call(mockTriggerFunctions); + + expect(mockChannel.prefetch).not.toHaveBeenCalled(); + expect(mockChannel.consume).toHaveBeenCalledWith('testQueue', expect.anything()); + expect(mockChannel.get).not.toHaveBeenCalled(); + expect(mockChannel.close).not.toHaveBeenCalled(); + await closeFunction!(); + expect(mockChannel.close).toHaveBeenCalled(); + }); + }); +});