fix(RabbitMQ Trigger Node): Respect the "Delete From Queue When" option with manual executions (#17554)

This commit is contained in:
RomanDavydchuk
2025-07-23 13:06:21 +03:00
committed by GitHub
parent c790f7e047
commit 2bd0aa38e2
4 changed files with 512 additions and 134 deletions

View File

@@ -1,8 +1,11 @@
import * as amqplib from 'amqplib';
import type {
IDeferredPromise,
IExecuteResponsePromiseData,
IDataObject,
IExecuteFunctions,
INodeExecutionData,
IRun,
ITriggerFunctions,
} from 'n8n-workflow';
import { jsonParse, sleep } from 'n8n-workflow';
@@ -114,11 +117,11 @@ export class MessageTracker {
isClosing = false;
received(message: amqplib.ConsumeMessage) {
received(message: amqplib.Message) {
this.messages.push(message.fields.deliveryTag);
}
answered(message: amqplib.ConsumeMessage) {
answered(message: amqplib.Message) {
if (this.messages.length === 0) {
return;
}
@@ -131,14 +134,16 @@ export class MessageTracker {
return this.messages.length;
}
async closeChannel(channel: amqplib.Channel, consumerTag: string) {
async closeChannel(channel: amqplib.Channel, consumerTag?: string) {
if (this.isClosing) {
return;
}
this.isClosing = true;
// Do not accept any new messages
await channel.cancel(consumerTag);
if (consumerTag) {
await channel.cancel(consumerTag);
}
let count = 0;
let unansweredMessages = this.unansweredMessages();
@@ -195,3 +200,70 @@ export const parseMessage = async (
}
}
};
export async function handleMessage(
this: ITriggerFunctions,
message: amqplib.Message,
channel: amqplib.Channel,
messageTracker: MessageTracker,
acknowledgeMode: string,
options: TriggerOptions,
) {
try {
if (acknowledgeMode !== 'immediately') {
messageTracker.received(message);
}
const item = await parseMessage(message, options, this.helpers);
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined = undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook = this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise.then(async (data: IRun) => {
if (data.data.resultData.error) {
// The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') {
channel.nack(message);
messageTracker.answered(message);
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise.then(() => {
channel.ack(message);
messageTracker.answered(message);
});
} else {
// Acknowledge message directly
channel.ack(message);
}
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
if (acknowledgeMode !== 'immediately') {
messageTracker.answered(message);
}
this.logger.error(
`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
}

View File

@@ -1,19 +1,16 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
import type { Message } from 'amqplib';
import type {
IDeferredPromise,
IExecuteResponsePromiseData,
INodeProperties,
INodeType,
INodeTypeDescription,
IRun,
ITriggerFunctions,
ITriggerResponse,
} from 'n8n-workflow';
import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import { rabbitDefaultOptions } from './DefaultOptions';
import { MessageTracker, rabbitmqConnectQueue, parseMessage } from './GenericFunctions';
import { MessageTracker, rabbitmqConnectQueue, handleMessage } from './GenericFunctions';
import type { TriggerOptions } from './types';
export class RabbitMQTrigger implements INodeType {
@@ -207,131 +204,10 @@ export class RabbitMQTrigger implements INodeType {
const options = this.getNodeParameter('options', {}) as TriggerOptions;
const channel = await rabbitmqConnectQueue.call(this, queue, options);
if (this.getMode() === 'manual') {
const manualTriggerFunction = async () => {
// Do only catch a single message when executing manually, else messages will leak
await channel.prefetch(1);
const processMessage = async (message: Message | null) => {
if (message !== null) {
const item = await parseMessage(message, options, this.helpers);
channel.ack(message);
this.emit([[item]]);
} else {
this.emitError(new Error('Connection got closed unexpectedly'));
}
};
const existingMessage = await channel.get(queue);
if (existingMessage) await processMessage(existingMessage);
else await channel.consume(queue, processMessage);
};
const closeFunction = async () => {
await channel.close();
await channel.connection.close();
return;
};
return {
closeFunction,
manualTriggerFunction,
};
}
const parallelMessages = options.parallelMessages ?? -1;
if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) {
throw new NodeOperationError(
this.getNode(),
'Parallel message processing limit must be a number greater than zero (or -1 for no limit)',
);
}
let acknowledgeMode = options.acknowledge ?? 'immediately';
if (parallelMessages !== -1 && acknowledgeMode === 'immediately') {
// If parallel message limit is set, then the default mode is "executionFinishes"
// unless acknowledgeMode got set specifically. Be aware that the mode "immediately"
// can not be supported in this case.
acknowledgeMode = 'executionFinishes';
}
const messageTracker = new MessageTracker();
let acknowledgeMode = options.acknowledge ?? 'immediately';
let closeGotCalled = false;
if (parallelMessages !== -1) {
await channel.prefetch(parallelMessages);
}
channel.on('close', () => {
if (!closeGotCalled) {
this.emitError(new Error('Connection got closed unexpectedly'));
}
});
const consumerInfo = await channel.consume(queue, async (message) => {
if (message !== null) {
try {
if (acknowledgeMode !== 'immediately') {
messageTracker.received(message);
}
const item = await parseMessage(message, options, this.helpers);
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook = this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise.then(async (data: IRun) => {
if (data.data.resultData.error) {
// The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') {
channel.nack(message);
messageTracker.answered(message);
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise.then(() => {
channel.ack(message);
messageTracker.answered(message);
});
} else {
// Acknowledge message directly
channel.ack(message);
}
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
if (acknowledgeMode !== 'immediately') {
messageTracker.answered(message);
}
this.logger.error(
`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
}
});
const consumerTag = consumerInfo.consumerTag;
let consumerTag: string | undefined;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
@@ -352,6 +228,73 @@ export class RabbitMQTrigger implements INodeType {
}
};
if (this.getMode() === 'manual') {
const manualTriggerFunction = async () => {
// Do only catch a single message when executing manually, else messages will leak
await channel.prefetch(1);
const processMessage = async (message: Message | null) => {
if (message !== null) {
void handleMessage.call(
this,
message,
channel,
messageTracker,
acknowledgeMode,
options,
);
} else {
this.emitError(new Error('Connection got closed unexpectedly'));
}
};
const existingMessage = await channel.get(queue);
if (existingMessage) {
await processMessage(existingMessage);
} else {
const consumerInfo = await channel.consume(queue, processMessage);
consumerTag = consumerInfo.consumerTag;
}
};
return {
closeFunction,
manualTriggerFunction,
};
}
const parallelMessages = options.parallelMessages ?? -1;
if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) {
throw new NodeOperationError(
this.getNode(),
'Parallel message processing limit must be a number greater than zero (or -1 for no limit)',
);
}
if (parallelMessages !== -1 && acknowledgeMode === 'immediately') {
// If parallel message limit is set, then the default mode is "executionFinishes"
// unless acknowledgeMode got set specifically. Be aware that the mode "immediately"
// can not be supported in this case.
acknowledgeMode = 'executionFinishes';
}
if (parallelMessages !== -1) {
await channel.prefetch(parallelMessages);
}
channel.on('close', () => {
if (!closeGotCalled) {
this.emitError(new Error('Connection got closed unexpectedly'));
}
});
const consumerInfo = await channel.consume(queue, async (message) => {
if (message !== null) {
void handleMessage.call(this, message, channel, messageTracker, acknowledgeMode, options);
}
});
consumerTag = consumerInfo.consumerTag;
return {
closeFunction,
};

View File

@@ -1,6 +1,6 @@
import type { Channel, Connection, ConsumeMessage, Message } from 'amqplib';
import { mock } from 'jest-mock-extended';
import type { ITriggerFunctions } from 'n8n-workflow';
import { mock, mockDeep } from 'jest-mock-extended';
import type { INode, IRun, ITriggerFunctions, IWorkflowMetadata } from 'n8n-workflow';
const mockChannel = mock<Channel>();
const mockConnection = mock<Connection>({ createChannel: async () => mockChannel });
@@ -15,6 +15,7 @@ import {
rabbitmqConnectQueue,
rabbitmqCreateChannel,
MessageTracker,
handleMessage,
} from '../GenericFunctions';
import type { TriggerOptions } from '../types';
@@ -26,7 +27,7 @@ describe('RabbitMQ GenericFunctions', () => {
password: 'pass',
vhost: '/',
};
const context = mock<ITriggerFunctions>();
const context = mockDeep<ITriggerFunctions>();
beforeEach(() => jest.clearAllMocks());
@@ -189,4 +190,248 @@ describe('RabbitMQ GenericFunctions', () => {
expect(mockConnection.close).toHaveBeenCalled();
});
});
describe('handleMessage', () => {
const mockChannel = mockDeep<Channel>();
const messageTracker = mock<MessageTracker>();
const message = {
content: {
foo: 'bar',
},
} as unknown as Message;
const item = { json: message };
const options = {} as TriggerOptions;
it('should ack a message with "acknowledgeMode" set to "immediately"', async () => {
await handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'immediately',
options,
);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, undefined);
expect(mockChannel.ack).toHaveBeenCalledWith(message);
});
it('should ack a message with "acknowledgeMode" set to "executionFinishesSuccessfully"', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishesSuccessfully',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise);
expect(mockChannel.ack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: undefined,
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.ack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should nack a message with "acknowledgeMode" set to "executionFinishesSuccessfully" when there is an error', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishesSuccessfully',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise);
expect(mockChannel.nack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: new Error('Some error'),
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.nack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should ack a message with "acknowledgeMode" set to "executionFinishes"', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishes',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise);
expect(mockChannel.ack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: undefined,
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.ack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should ack a message with "acknowledgeMode" set to "laterMessageNode"', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'laterMessageNode',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], deferredPromise, undefined);
expect(mockChannel.ack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: undefined,
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.ack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should handle error when "acknowledgeMode" is set to "immediately"', async () => {
mockChannel.ack.mockImplementation(() => {
throw new Error('Test error');
});
context.getWorkflow.mockReturnValue({
id: '123',
} as IWorkflowMetadata);
context.getNode.mockReturnValue({
name: 'Test node',
} as INode);
await handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'immediately',
options,
);
expect(context.logger.error).toHaveBeenCalledWith(
'There was a problem with the RabbitMQ Trigger node "Test node" in workflow "123": "Test error"',
{
node: 'Test node',
workflowId: '123',
},
);
});
it('should handle error when "acknowledgeMode" is set to something other than "immediately"', async () => {
context.helpers.createDeferredPromise.mockImplementation(() => {
throw new Error('Test error');
});
context.getWorkflow.mockReturnValue({
id: '123',
} as IWorkflowMetadata);
context.getNode.mockReturnValue({
name: 'Test node',
} as INode);
await handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishesSuccessfully',
options,
);
expect(context.logger.error).toHaveBeenCalledWith(
'There was a problem with the RabbitMQ Trigger node "Test node" in workflow "123": "Test error"',
{
node: 'Test node',
workflowId: '123',
},
);
});
});
});

View File

@@ -0,0 +1,118 @@
import { mockDeep } from 'jest-mock-extended';
import type { ITriggerFunctions } from 'n8n-workflow';
import * as GenericFunctions from '../GenericFunctions';
import type { Channel, GetMessage } from 'amqplib';
import { RabbitMQTrigger } from '../RabbitMQTrigger.node';
describe('RabbitMQTrigger node', () => {
const trigger = new RabbitMQTrigger();
const mockTriggerFunctions = mockDeep<ITriggerFunctions>();
const connectSpy = jest.spyOn(GenericFunctions, 'rabbitmqConnectQueue');
const handleMessageSpy = jest.spyOn(GenericFunctions, 'handleMessage');
const mockChannel = mockDeep<Channel>();
beforeEach(() => {
jest.resetAllMocks();
});
describe('manual execution', () => {
it('should get a message from the queue', async () => {
const message = {
content: {
foo: 'bar',
},
fields: {
deliveryTag: 1,
},
};
const options = { acknowledge: 'immediately' };
mockTriggerFunctions.getMode.mockReturnValue('manual');
mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => {
switch (parameterName) {
case 'queue':
return 'testQueue';
case 'options':
return options;
}
return undefined;
});
connectSpy.mockResolvedValue(mockChannel);
mockChannel.get.mockResolvedValue(message as unknown as GetMessage);
const { closeFunction, manualTriggerFunction } =
await trigger.trigger.call(mockTriggerFunctions);
await manualTriggerFunction!();
expect(mockChannel.prefetch).toHaveBeenCalledWith(1);
expect(mockChannel.get).toHaveBeenCalledWith('testQueue');
expect(handleMessageSpy).toHaveBeenCalledWith(
message,
mockChannel,
expect.anything(),
'immediately',
options,
);
expect(mockChannel.consume).not.toHaveBeenCalled();
expect(mockChannel.close).not.toHaveBeenCalled();
await closeFunction!();
expect(mockChannel.close).toHaveBeenCalled();
});
it('should listen for a message from the queue', async () => {
const options = { acknowledge: 'immediately' };
mockTriggerFunctions.getMode.mockReturnValue('manual');
mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => {
switch (parameterName) {
case 'queue':
return 'testQueue';
case 'options':
return options;
}
return undefined;
});
connectSpy.mockResolvedValue(mockChannel);
mockChannel.consume.mockResolvedValue({
consumerTag: 'testConsumerTag',
});
const { closeFunction, manualTriggerFunction } =
await trigger.trigger.call(mockTriggerFunctions);
await manualTriggerFunction!();
expect(mockChannel.prefetch).toHaveBeenCalledWith(1);
expect(mockChannel.consume).toHaveBeenCalledWith('testQueue', expect.anything());
expect(mockChannel.close).not.toHaveBeenCalled();
await closeFunction!();
expect(mockChannel.close).toHaveBeenCalled();
});
});
describe('regular execution', () => {
it('should listen for a message from the queue', async () => {
const options = { acknowledge: 'immediately' };
mockTriggerFunctions.getMode.mockReturnValue('trigger');
mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => {
switch (parameterName) {
case 'queue':
return 'testQueue';
case 'options':
return options;
}
return undefined;
});
connectSpy.mockResolvedValue(mockChannel);
mockChannel.consume.mockResolvedValue({
consumerTag: 'testConsumerTag',
});
const { closeFunction } = await trigger.trigger.call(mockTriggerFunctions);
expect(mockChannel.prefetch).not.toHaveBeenCalled();
expect(mockChannel.consume).toHaveBeenCalledWith('testQueue', expect.anything());
expect(mockChannel.get).not.toHaveBeenCalled();
expect(mockChannel.close).not.toHaveBeenCalled();
await closeFunction!();
expect(mockChannel.close).toHaveBeenCalled();
});
});
});