fix(AMQP Trigger Node): Update rhea library, tweak reconnection options (#18980)

This commit is contained in:
Elias Meire
2025-09-05 18:34:29 +02:00
committed by GitHub
parent 0ba924bda5
commit efc3a2d664
10 changed files with 406 additions and 40 deletions

View File

@@ -0,0 +1,218 @@
import { mock } from 'jest-mock-extended';
import type {
ICredentialDataDecryptedObject,
IExecuteFunctions,
ICredentialTestFunctions,
ICredentialsDecrypted,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { Amqp } from './Amqp.node';
// Mock the entire rhea module
const mockSender = {
close: jest.fn(),
send: jest.fn().mockReturnValue({ id: 'test-message-id' }),
};
const mockConnection = {
close: jest.fn(),
open_sender: jest.fn().mockReturnValue(mockSender),
options: { reconnect: true },
};
const mockContainer = {
connect: jest.fn().mockReturnValue(mockConnection),
on: jest.fn(),
once: jest.fn(),
};
jest.mock('rhea', () => ({
create_container: jest.fn(() => mockContainer),
}));
describe('AMQP Node', () => {
const credentials = mock<ICredentialDataDecryptedObject>({
hostname: 'localhost',
port: 5672,
username: 'testuser',
password: 'testpass',
transportType: 'tcp',
});
const executeFunctions = mock<IExecuteFunctions>({
getNode: jest.fn().mockReturnValue({ name: 'AMQP Test Node' }),
continueOnFail: jest.fn().mockReturnValue(false),
});
beforeEach(() => {
jest.clearAllMocks();
executeFunctions.getCredentials.calledWith('amqp').mockResolvedValue(credentials);
executeFunctions.getInputData.mockReturnValue([{ json: { testing: true } }]);
executeFunctions.getNodeParameter.calledWith('sink', 0).mockReturnValue('test/queue');
executeFunctions.getNodeParameter.calledWith('headerParametersJson', 0).mockReturnValue({});
executeFunctions.getNodeParameter.calledWith('options', 0).mockReturnValue({});
// Setup container event mocking
mockContainer.once.mockImplementation((event: string, callback: any) => {
if (event === 'sendable') {
// Call the callback immediately to simulate successful connection
callback({ sender: mockSender });
}
});
// Mock successful credential validation by making the connection open immediately
mockContainer.on.mockImplementation((event: string, callback: any) => {
if (event === 'connection_open') {
setImmediate(() => callback({}));
}
});
});
it('should throw error when sink is empty', async () => {
executeFunctions.getNodeParameter.calledWith('sink', 0).mockReturnValue('');
await expect(new Amqp().execute.call(executeFunctions)).rejects.toThrow(
new NodeOperationError(executeFunctions.getNode(), 'Queue or Topic required!'),
);
});
it('should send message successfully', async () => {
const result = await new Amqp().execute.call(executeFunctions);
expect(result).toEqual([[{ json: { id: 'test-message-id' }, pairedItems: { item: 0 } }]]);
expect(executeFunctions.getCredentials).toHaveBeenCalledWith('amqp');
expect(mockContainer.connect).toHaveBeenCalled();
expect(mockConnection.open_sender).toHaveBeenCalledWith('test/queue');
expect(mockSender.send).toHaveBeenCalledWith({
application_properties: {},
body: '{"testing":true}',
});
expect(mockSender.close).toHaveBeenCalled();
expect(mockConnection.close).toHaveBeenCalled();
});
it('should send message with custom headers', async () => {
executeFunctions.getNodeParameter
.calledWith('headerParametersJson', 0)
.mockReturnValue('{"custom":"header","priority":1}');
await new Amqp().execute.call(executeFunctions);
expect(mockSender.send).toHaveBeenCalledWith({
application_properties: { custom: 'header', priority: 1 },
body: '{"testing":true}',
});
});
it('should send only specific property when configured', async () => {
executeFunctions.getNodeParameter.calledWith('options', 0).mockReturnValue({
sendOnlyProperty: 'testing',
});
executeFunctions.getInputData.mockReturnValue([{ json: { testing: 'specific-value' } }]);
await new Amqp().execute.call(executeFunctions);
expect(mockSender.send).toHaveBeenCalledWith({
application_properties: {},
body: '"specific-value"',
});
});
it('should send data as object when configured', async () => {
executeFunctions.getNodeParameter.calledWith('options', 0).mockReturnValue({
dataAsObject: true,
});
await new Amqp().execute.call(executeFunctions);
expect(mockSender.send).toHaveBeenCalledWith({
application_properties: {},
body: { testing: true },
});
});
it('should handle multiple input items', async () => {
executeFunctions.getInputData.mockReturnValue([{ json: { item: 1 } }, { json: { item: 2 } }]);
const result = await new Amqp().execute.call(executeFunctions);
expect(result).toEqual([
[
{ json: { id: 'test-message-id' }, pairedItems: { item: 0 } },
{ json: { id: 'test-message-id' }, pairedItems: { item: 1 } },
],
]);
expect(mockSender.send).toHaveBeenCalledTimes(2);
expect(mockSender.send).toHaveBeenNthCalledWith(1, {
application_properties: {},
body: '{"item":1}',
});
expect(mockSender.send).toHaveBeenNthCalledWith(2, {
application_properties: {},
body: '{"item":2}',
});
});
it('should continue on fail when configured', async () => {
executeFunctions.continueOnFail.mockReturnValue(true);
executeFunctions.getNodeParameter.calledWith('sink', 0).mockReturnValue('');
const result = await new Amqp().execute.call(executeFunctions);
expect(result).toEqual([
[{ json: { error: 'Queue or Topic required!' }, pairedItems: { item: 0 } }],
]);
});
describe('credential test', () => {
it('should return success for valid credentials', async () => {
const amqp = new Amqp();
const testFunctions = mock<ICredentialTestFunctions>();
// Mock successful connection
mockContainer.on.mockImplementation((event: string, callback: any) => {
if (event === 'connection_open') {
setImmediate(() => callback({}));
}
});
const result = await amqp.methods.credentialTest.amqpConnectionTest.call(testFunctions, {
data: credentials,
id: 'test',
name: 'test',
type: 'amqp',
} as ICredentialsDecrypted);
expect(result).toEqual({
status: 'OK',
message: 'Connection successful!',
});
});
it('should return error for invalid credentials', async () => {
const amqp = new Amqp();
const testFunctions = mock<ICredentialTestFunctions>();
// Mock failed connection
mockContainer.on.mockImplementation((event: string, callback: any) => {
if (event === 'disconnected') {
setImmediate(() => callback({ error: new Error('Authentication failed') }));
}
});
const result = await amqp.methods.credentialTest.amqpConnectionTest.call(testFunctions, {
data: credentials,
id: 'test',
name: 'test',
type: 'amqp',
} as ICredentialsDecrypted);
expect(result).toEqual({
status: 'Error',
message: 'Authentication failed',
});
});
});
});

View File

@@ -10,21 +10,23 @@ import type {
ICredentialDataDecryptedObject,
} from 'n8n-workflow';
import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import type { Connection, ContainerOptions, Dictionary, EventContext, Sender } from 'rhea';
import type { Connection, ConnectionOptions, Dictionary, EventContext, Sender } from 'rhea';
import { create_container } from 'rhea';
import type { AmqpCredential } from './types';
async function checkIfCredentialsValid(
credentials: IDataObject,
): Promise<INodeCredentialTestResult> {
const connectOptions: ContainerOptions = {
const connectOptions: ConnectionOptions = {
reconnect: false,
host: credentials.hostname as string,
hostname: credentials.hostname as string,
port: credentials.port as number,
username: credentials.username ? (credentials.username as string) : undefined,
password: credentials.password ? (credentials.password as string) : undefined,
transport: credentials.transportType ? (credentials.transportType as string) : undefined,
};
transport: credentials.transportType ? (credentials.transportType as 'tcp' | 'tls') : undefined,
} as unknown as ConnectionOptions;
let conn: Connection | undefined = undefined;
try {
@@ -157,7 +159,7 @@ export class Amqp implements INodeType {
let sender: Sender | undefined = undefined;
try {
const credentials = await this.getCredentials('amqp');
const credentials = await this.getCredentials<AmqpCredential>('amqp');
// check if credentials are valid to avoid unnecessary reconnects
const credentialsTestResult = await checkIfCredentialsValid(credentials);
@@ -190,7 +192,8 @@ export class Amqp implements INodeType {
/*
Values are documented here: https://github.com/amqp/rhea#container
*/
const connectOptions: ContainerOptions = {
const connectOptions: ConnectionOptions = {
host: credentials.hostname,
hostname: credentials.hostname,
port: credentials.port,
@@ -201,7 +204,7 @@ export class Amqp implements INodeType {
id: containerId ? containerId : undefined,
reconnect: containerReconnect,
reconnect_limit: containerReconnectLimit,
};
} as unknown as ConnectionOptions;
const node = this.getNode();

View File

@@ -0,0 +1,128 @@
import { NodeOperationError } from 'n8n-workflow';
import { testTriggerNode } from '@test/nodes/TriggerHelpers';
import { AmqpTrigger } from './AmqpTrigger.node';
let eventHandlers: Record<string, (...args: unknown[]) => void> = {};
const mockAddCredit = jest.fn();
const mockClose = jest.fn();
const mockOpenReceiver = jest.fn();
const mockConnection = {
open_receiver: mockOpenReceiver,
close: mockClose,
};
jest.mock('rhea', () => ({
create_container: jest.fn(() => ({
on: (event: string, handler: (...args: unknown[]) => void) => {
eventHandlers[event] = handler;
},
removeAllListeners: jest.fn((event: string) => {
delete eventHandlers[event];
}),
connect: jest.fn(() => mockConnection),
})),
}));
describe('AMQP Trigger Node', () => {
beforeEach(() => {
jest.clearAllMocks();
eventHandlers = {};
});
it('should throw if no sink provided', async () => {
await expect(
testTriggerNode(AmqpTrigger, {
mode: 'trigger',
node: { parameters: { sink: '' } },
credential: { hostname: 'localhost', port: 5672 },
}),
).rejects.toThrow(NodeOperationError);
});
it('should emit a full message in trigger mode', async () => {
const { emit, close } = await testTriggerNode(AmqpTrigger, {
mode: 'trigger',
node: { parameters: { sink: 'queue://test' } },
credential: { hostname: 'localhost', port: 5672 },
});
eventHandlers['receiver_open']({ receiver: { add_credit: mockAddCredit } });
expect(mockAddCredit).toHaveBeenCalledWith(100);
const message = { body: 'hello', message_id: 1 };
eventHandlers['message']({
message,
});
expect(emit).toHaveBeenCalledWith([[{ json: message }]]);
await close();
expect(mockClose).toHaveBeenCalled();
});
it('should parse JSON body when jsonParseBody = true', async () => {
const { emit } = await testTriggerNode(AmqpTrigger, {
mode: 'trigger',
node: { parameters: { sink: 'queue://test', options: { jsonParseBody: true } } },
credential: { hostname: 'localhost', port: 5672 },
});
eventHandlers['message']({
message: { body: '{"foo":"bar"}', message_id: 2 },
});
expect(emit).toHaveBeenCalledWith([[{ json: { body: { foo: 'bar' }, message_id: 2 } }]]);
});
it('should return only body when onlyBody = true', async () => {
const { emit } = await testTriggerNode(AmqpTrigger, {
mode: 'trigger',
node: { parameters: { sink: 'queue://test', options: { onlyBody: true } } },
credential: { hostname: 'localhost', port: 5672 },
});
eventHandlers['message']({
message: { body: { nested: true }, message_id: 3 },
});
expect(emit).toHaveBeenCalledWith([[{ json: { nested: true } }]]);
});
it('should reject in manual mode after 15s with no message', async () => {
const timeoutSpy = jest.spyOn(global, 'setTimeout').mockImplementation((fn) => {
fn(); // fire immediately
return 1 as unknown as NodeJS.Timeout;
});
const { manualTriggerFunction } = await testTriggerNode(AmqpTrigger, {
mode: 'manual',
node: { parameters: { sink: 'queue://test' } },
credential: { hostname: 'localhost', port: 5672 },
});
await expect(manualTriggerFunction?.()).rejects.toThrow(
'Aborted because no message received within 15 seconds',
);
timeoutSpy.mockRestore();
});
it('should resolve in manual mode when a message arrives', async () => {
const { manualTriggerFunction, emit } = await testTriggerNode(AmqpTrigger, {
mode: 'manual',
node: { parameters: { sink: 'queue://test' } },
credential: { hostname: 'localhost', port: 5672 },
});
const manualTriggerPromise = manualTriggerFunction?.();
eventHandlers['message']({
message: { body: '{"foo":"bar"}', message_id: 2 },
});
await manualTriggerPromise;
expect(emit).toHaveBeenCalledWith([[{ json: { body: '{"foo":"bar"}', message_id: 2 } }]]);
});
});

View File

@@ -8,9 +8,11 @@ import type {
IRun,
} from 'n8n-workflow';
import { deepCopy, jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import type { ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea';
import type { ConnectionOptions, EventContext, Message, ReceiverOptions } from 'rhea';
import { create_container } from 'rhea';
import type { AmqpCredential } from './types';
export class AmqpTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'AMQP Trigger',
@@ -136,7 +138,7 @@ export class AmqpTrigger implements INodeType {
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const credentials = await this.getCredentials('amqp');
const credentials = await this.getCredentials<AmqpCredential>('amqp');
const sink = this.getNodeParameter('sink', '') as string;
const clientname = this.getNodeParameter('clientname', '') as string;
@@ -146,7 +148,8 @@ export class AmqpTrigger implements INodeType {
const pullMessagesNumber = (options.pullMessagesNumber as number) || 100;
const containerId = options.containerId as string;
const containerReconnect = (options.reconnect as boolean) || true;
const containerReconnectLimit = (options.reconnectLimit as number) || 50;
// Keep reconnecting (exponential backoff) forever unless user sets a limit
const containerReconnectLimit = (options.reconnectLimit as number) ?? undefined;
if (sink === '') {
throw new NodeOperationError(this.getNode(), 'Queue or Topic required!');
@@ -227,20 +230,22 @@ export class AmqpTrigger implements INodeType {
});
/*
Values are documentet here: https://github.com/amqp/rhea#container
Values are documented here: https://github.com/amqp/rhea#container
*/
const connectOptions: ContainerOptions = {
const connectOptions: ConnectionOptions = {
host: credentials.hostname,
hostname: credentials.hostname,
port: credentials.port,
reconnect: containerReconnect,
reconnect_limit: containerReconnectLimit,
// Try reconnection even if caused by a fatal error
all_errors_non_fatal: true,
username: credentials.username ? credentials.username : undefined,
password: credentials.password ? credentials.password : undefined,
transport: credentials.transportType ? credentials.transportType : undefined,
container_id: containerId ? containerId : undefined,
id: containerId ? containerId : undefined,
};
} as unknown as ConnectionOptions;
const connection = container.connect(connectOptions);
const clientOptions: ReceiverOptions = {

View File

@@ -0,0 +1,7 @@
export type AmqpCredential = {
hostname: string;
port: number;
username?: string;
password?: string;
transportType?: 'tcp' | 'tls';
};

View File

@@ -310,7 +310,7 @@ describe('KafkaTrigger Node', () => {
});
it('should handle manual trigger mode', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
const { emit, manualTriggerFunction } = await testTriggerNode(KafkaTrigger, {
mode: 'manual',
node: {
parameters: {
@@ -330,6 +330,8 @@ describe('KafkaTrigger Node', () => {
},
});
await manualTriggerFunction?.();
expect(mockConsumerConnect).toHaveBeenCalledTimes(1);
expect(mockConsumerSubscribe).toHaveBeenCalledTimes(1);
expect(mockConsumerRun).toHaveBeenCalledTimes(1);

View File

@@ -108,13 +108,15 @@ describe('ScheduleTrigger', () => {
});
it('should emit when manually executed', async () => {
const { emit } = await testTriggerNode(ScheduleTrigger, {
const { emit, manualTriggerFunction } = await testTriggerNode(ScheduleTrigger, {
mode: 'manual',
timezone,
node: { parameters: { rule: { interval: [{ field: 'hours', hoursInterval: 3 }] } } },
workflowStaticData: { recurrenceRules: [] },
});
await manualTriggerFunction?.();
expect(emit).toHaveBeenCalledTimes(1);
const firstTriggerData = emit.mock.calls[0][0][0][0];
@@ -134,25 +136,26 @@ describe('ScheduleTrigger', () => {
});
it('should throw on invalid cron expressions in manual mode', async () => {
await expect(
testTriggerNode(ScheduleTrigger, {
mode: 'manual',
timezone,
node: {
parameters: {
rule: {
interval: [
{
field: 'cronExpression',
expression: '@daily *', // adding extra fields to shorthand not allowed -> invalid
},
],
},
const { manualTriggerFunction } = await testTriggerNode(ScheduleTrigger, {
mode: 'manual',
timezone,
node: {
parameters: {
rule: {
interval: [
{
field: 'cronExpression',
expression: '@daily *', // adding extra fields to shorthand not allowed -> invalid
},
],
},
},
workflowStaticData: {},
}),
).rejects.toBeInstanceOf(n8nWorkflow.NodeOperationError);
},
workflowStaticData: {},
});
await expect(manualTriggerFunction?.()).rejects.toBeInstanceOf(
n8nWorkflow.NodeOperationError,
);
});
});
});

View File

@@ -936,7 +936,7 @@
"pyodide": "0.28.0",
"redis": "4.6.14",
"rfc2047": "4.0.1",
"rhea": "1.0.24",
"rhea": "3.0.4",
"rrule": "2.8.1",
"rss-parser": "3.13.0",
"sanitize-html": "2.12.1",

View File

@@ -120,11 +120,11 @@ export async function testTriggerNode(
if (options.mode === 'manual') {
expect(response?.manualTriggerFunction).toBeInstanceOf(Function);
await response?.manualTriggerFunction?.();
}
return {
close: jest.fn(response?.closeFunction),
manualTriggerFunction: options.mode === 'manual' ? response?.manualTriggerFunction : undefined,
emit,
};
}