From efc3a2d66484a14338cb67aa1135e71d25fb5c4e Mon Sep 17 00:00:00 2001 From: Elias Meire Date: Fri, 5 Sep 2025 18:34:29 +0200 Subject: [PATCH] fix(AMQP Trigger Node): Update rhea library, tweak reconnection options (#18980) --- .../nodes-base/nodes/Amqp/Amqp.node.test.ts | 218 ++++++++++++++++++ packages/nodes-base/nodes/Amqp/Amqp.node.ts | 17 +- .../nodes/Amqp/AmqpTrigger.node.test.ts | 128 ++++++++++ .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 17 +- packages/nodes-base/nodes/Amqp/types.ts | 7 + .../Kafka/test/KafkaTrigger.node.test.ts | 4 +- .../test/ScheduleTrigger.node.test.ts | 39 ++-- packages/nodes-base/package.json | 2 +- .../nodes-base/test/nodes/TriggerHelpers.ts | 2 +- pnpm-lock.yaml | 12 +- 10 files changed, 406 insertions(+), 40 deletions(-) create mode 100644 packages/nodes-base/nodes/Amqp/Amqp.node.test.ts create mode 100644 packages/nodes-base/nodes/Amqp/AmqpTrigger.node.test.ts create mode 100644 packages/nodes-base/nodes/Amqp/types.ts diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.test.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.test.ts new file mode 100644 index 0000000000..a936f45600 --- /dev/null +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.test.ts @@ -0,0 +1,218 @@ +import { mock } from 'jest-mock-extended'; +import type { + ICredentialDataDecryptedObject, + IExecuteFunctions, + ICredentialTestFunctions, + ICredentialsDecrypted, +} from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +import { Amqp } from './Amqp.node'; + +// Mock the entire rhea module +const mockSender = { + close: jest.fn(), + send: jest.fn().mockReturnValue({ id: 'test-message-id' }), +}; + +const mockConnection = { + close: jest.fn(), + open_sender: jest.fn().mockReturnValue(mockSender), + options: { reconnect: true }, +}; + +const mockContainer = { + connect: jest.fn().mockReturnValue(mockConnection), + on: jest.fn(), + once: jest.fn(), +}; + +jest.mock('rhea', () => ({ + create_container: jest.fn(() => mockContainer), +})); + +describe('AMQP Node', () => { + const credentials = mock({ + hostname: 'localhost', + port: 5672, + username: 'testuser', + password: 'testpass', + transportType: 'tcp', + }); + + const executeFunctions = mock({ + getNode: jest.fn().mockReturnValue({ name: 'AMQP Test Node' }), + continueOnFail: jest.fn().mockReturnValue(false), + }); + + beforeEach(() => { + jest.clearAllMocks(); + + executeFunctions.getCredentials.calledWith('amqp').mockResolvedValue(credentials); + executeFunctions.getInputData.mockReturnValue([{ json: { testing: true } }]); + executeFunctions.getNodeParameter.calledWith('sink', 0).mockReturnValue('test/queue'); + executeFunctions.getNodeParameter.calledWith('headerParametersJson', 0).mockReturnValue({}); + executeFunctions.getNodeParameter.calledWith('options', 0).mockReturnValue({}); + + // Setup container event mocking + mockContainer.once.mockImplementation((event: string, callback: any) => { + if (event === 'sendable') { + // Call the callback immediately to simulate successful connection + callback({ sender: mockSender }); + } + }); + + // Mock successful credential validation by making the connection open immediately + mockContainer.on.mockImplementation((event: string, callback: any) => { + if (event === 'connection_open') { + setImmediate(() => callback({})); + } + }); + }); + + it('should throw error when sink is empty', async () => { + executeFunctions.getNodeParameter.calledWith('sink', 0).mockReturnValue(''); + + await expect(new Amqp().execute.call(executeFunctions)).rejects.toThrow( + new NodeOperationError(executeFunctions.getNode(), 'Queue or Topic required!'), + ); + }); + + it('should send message successfully', async () => { + const result = await new Amqp().execute.call(executeFunctions); + + expect(result).toEqual([[{ json: { id: 'test-message-id' }, pairedItems: { item: 0 } }]]); + expect(executeFunctions.getCredentials).toHaveBeenCalledWith('amqp'); + expect(mockContainer.connect).toHaveBeenCalled(); + expect(mockConnection.open_sender).toHaveBeenCalledWith('test/queue'); + expect(mockSender.send).toHaveBeenCalledWith({ + application_properties: {}, + body: '{"testing":true}', + }); + expect(mockSender.close).toHaveBeenCalled(); + expect(mockConnection.close).toHaveBeenCalled(); + }); + + it('should send message with custom headers', async () => { + executeFunctions.getNodeParameter + .calledWith('headerParametersJson', 0) + .mockReturnValue('{"custom":"header","priority":1}'); + + await new Amqp().execute.call(executeFunctions); + + expect(mockSender.send).toHaveBeenCalledWith({ + application_properties: { custom: 'header', priority: 1 }, + body: '{"testing":true}', + }); + }); + + it('should send only specific property when configured', async () => { + executeFunctions.getNodeParameter.calledWith('options', 0).mockReturnValue({ + sendOnlyProperty: 'testing', + }); + executeFunctions.getInputData.mockReturnValue([{ json: { testing: 'specific-value' } }]); + + await new Amqp().execute.call(executeFunctions); + + expect(mockSender.send).toHaveBeenCalledWith({ + application_properties: {}, + body: '"specific-value"', + }); + }); + + it('should send data as object when configured', async () => { + executeFunctions.getNodeParameter.calledWith('options', 0).mockReturnValue({ + dataAsObject: true, + }); + + await new Amqp().execute.call(executeFunctions); + + expect(mockSender.send).toHaveBeenCalledWith({ + application_properties: {}, + body: { testing: true }, + }); + }); + + it('should handle multiple input items', async () => { + executeFunctions.getInputData.mockReturnValue([{ json: { item: 1 } }, { json: { item: 2 } }]); + + const result = await new Amqp().execute.call(executeFunctions); + + expect(result).toEqual([ + [ + { json: { id: 'test-message-id' }, pairedItems: { item: 0 } }, + { json: { id: 'test-message-id' }, pairedItems: { item: 1 } }, + ], + ]); + expect(mockSender.send).toHaveBeenCalledTimes(2); + expect(mockSender.send).toHaveBeenNthCalledWith(1, { + application_properties: {}, + body: '{"item":1}', + }); + expect(mockSender.send).toHaveBeenNthCalledWith(2, { + application_properties: {}, + body: '{"item":2}', + }); + }); + + it('should continue on fail when configured', async () => { + executeFunctions.continueOnFail.mockReturnValue(true); + executeFunctions.getNodeParameter.calledWith('sink', 0).mockReturnValue(''); + + const result = await new Amqp().execute.call(executeFunctions); + + expect(result).toEqual([ + [{ json: { error: 'Queue or Topic required!' }, pairedItems: { item: 0 } }], + ]); + }); + + describe('credential test', () => { + it('should return success for valid credentials', async () => { + const amqp = new Amqp(); + const testFunctions = mock(); + + // Mock successful connection + mockContainer.on.mockImplementation((event: string, callback: any) => { + if (event === 'connection_open') { + setImmediate(() => callback({})); + } + }); + + const result = await amqp.methods.credentialTest.amqpConnectionTest.call(testFunctions, { + data: credentials, + id: 'test', + name: 'test', + type: 'amqp', + } as ICredentialsDecrypted); + + expect(result).toEqual({ + status: 'OK', + message: 'Connection successful!', + }); + }); + + it('should return error for invalid credentials', async () => { + const amqp = new Amqp(); + const testFunctions = mock(); + + // Mock failed connection + mockContainer.on.mockImplementation((event: string, callback: any) => { + if (event === 'disconnected') { + setImmediate(() => callback({ error: new Error('Authentication failed') })); + } + }); + + const result = await amqp.methods.credentialTest.amqpConnectionTest.call(testFunctions, { + data: credentials, + id: 'test', + name: 'test', + type: 'amqp', + } as ICredentialsDecrypted); + + expect(result).toEqual({ + status: 'Error', + message: 'Authentication failed', + }); + }); + }); +}); diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 70ce348644..2b93c787a1 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -10,21 +10,23 @@ import type { ICredentialDataDecryptedObject, } from 'n8n-workflow'; import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; -import type { Connection, ContainerOptions, Dictionary, EventContext, Sender } from 'rhea'; +import type { Connection, ConnectionOptions, Dictionary, EventContext, Sender } from 'rhea'; import { create_container } from 'rhea'; +import type { AmqpCredential } from './types'; + async function checkIfCredentialsValid( credentials: IDataObject, ): Promise { - const connectOptions: ContainerOptions = { + const connectOptions: ConnectionOptions = { reconnect: false, host: credentials.hostname as string, hostname: credentials.hostname as string, port: credentials.port as number, username: credentials.username ? (credentials.username as string) : undefined, password: credentials.password ? (credentials.password as string) : undefined, - transport: credentials.transportType ? (credentials.transportType as string) : undefined, - }; + transport: credentials.transportType ? (credentials.transportType as 'tcp' | 'tls') : undefined, + } as unknown as ConnectionOptions; let conn: Connection | undefined = undefined; try { @@ -157,7 +159,7 @@ export class Amqp implements INodeType { let sender: Sender | undefined = undefined; try { - const credentials = await this.getCredentials('amqp'); + const credentials = await this.getCredentials('amqp'); // check if credentials are valid to avoid unnecessary reconnects const credentialsTestResult = await checkIfCredentialsValid(credentials); @@ -190,7 +192,8 @@ export class Amqp implements INodeType { /* Values are documented here: https://github.com/amqp/rhea#container */ - const connectOptions: ContainerOptions = { + + const connectOptions: ConnectionOptions = { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, @@ -201,7 +204,7 @@ export class Amqp implements INodeType { id: containerId ? containerId : undefined, reconnect: containerReconnect, reconnect_limit: containerReconnectLimit, - }; + } as unknown as ConnectionOptions; const node = this.getNode(); diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.test.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.test.ts new file mode 100644 index 0000000000..f356c4afae --- /dev/null +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.test.ts @@ -0,0 +1,128 @@ +import { NodeOperationError } from 'n8n-workflow'; + +import { testTriggerNode } from '@test/nodes/TriggerHelpers'; + +import { AmqpTrigger } from './AmqpTrigger.node'; + +let eventHandlers: Record void> = {}; +const mockAddCredit = jest.fn(); +const mockClose = jest.fn(); +const mockOpenReceiver = jest.fn(); + +const mockConnection = { + open_receiver: mockOpenReceiver, + close: mockClose, +}; + +jest.mock('rhea', () => ({ + create_container: jest.fn(() => ({ + on: (event: string, handler: (...args: unknown[]) => void) => { + eventHandlers[event] = handler; + }, + removeAllListeners: jest.fn((event: string) => { + delete eventHandlers[event]; + }), + connect: jest.fn(() => mockConnection), + })), +})); + +describe('AMQP Trigger Node', () => { + beforeEach(() => { + jest.clearAllMocks(); + eventHandlers = {}; + }); + + it('should throw if no sink provided', async () => { + await expect( + testTriggerNode(AmqpTrigger, { + mode: 'trigger', + node: { parameters: { sink: '' } }, + credential: { hostname: 'localhost', port: 5672 }, + }), + ).rejects.toThrow(NodeOperationError); + }); + + it('should emit a full message in trigger mode', async () => { + const { emit, close } = await testTriggerNode(AmqpTrigger, { + mode: 'trigger', + node: { parameters: { sink: 'queue://test' } }, + credential: { hostname: 'localhost', port: 5672 }, + }); + + eventHandlers['receiver_open']({ receiver: { add_credit: mockAddCredit } }); + expect(mockAddCredit).toHaveBeenCalledWith(100); + + const message = { body: 'hello', message_id: 1 }; + eventHandlers['message']({ + message, + }); + + expect(emit).toHaveBeenCalledWith([[{ json: message }]]); + await close(); + expect(mockClose).toHaveBeenCalled(); + }); + + it('should parse JSON body when jsonParseBody = true', async () => { + const { emit } = await testTriggerNode(AmqpTrigger, { + mode: 'trigger', + node: { parameters: { sink: 'queue://test', options: { jsonParseBody: true } } }, + credential: { hostname: 'localhost', port: 5672 }, + }); + + eventHandlers['message']({ + message: { body: '{"foo":"bar"}', message_id: 2 }, + }); + + expect(emit).toHaveBeenCalledWith([[{ json: { body: { foo: 'bar' }, message_id: 2 } }]]); + }); + + it('should return only body when onlyBody = true', async () => { + const { emit } = await testTriggerNode(AmqpTrigger, { + mode: 'trigger', + node: { parameters: { sink: 'queue://test', options: { onlyBody: true } } }, + credential: { hostname: 'localhost', port: 5672 }, + }); + + eventHandlers['message']({ + message: { body: { nested: true }, message_id: 3 }, + }); + + expect(emit).toHaveBeenCalledWith([[{ json: { nested: true } }]]); + }); + + it('should reject in manual mode after 15s with no message', async () => { + const timeoutSpy = jest.spyOn(global, 'setTimeout').mockImplementation((fn) => { + fn(); // fire immediately + return 1 as unknown as NodeJS.Timeout; + }); + + const { manualTriggerFunction } = await testTriggerNode(AmqpTrigger, { + mode: 'manual', + node: { parameters: { sink: 'queue://test' } }, + credential: { hostname: 'localhost', port: 5672 }, + }); + + await expect(manualTriggerFunction?.()).rejects.toThrow( + 'Aborted because no message received within 15 seconds', + ); + timeoutSpy.mockRestore(); + }); + + it('should resolve in manual mode when a message arrives', async () => { + const { manualTriggerFunction, emit } = await testTriggerNode(AmqpTrigger, { + mode: 'manual', + node: { parameters: { sink: 'queue://test' } }, + credential: { hostname: 'localhost', port: 5672 }, + }); + + const manualTriggerPromise = manualTriggerFunction?.(); + + eventHandlers['message']({ + message: { body: '{"foo":"bar"}', message_id: 2 }, + }); + + await manualTriggerPromise; + + expect(emit).toHaveBeenCalledWith([[{ json: { body: '{"foo":"bar"}', message_id: 2 } }]]); + }); +}); diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index e4fb9154d9..9ac24af929 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -8,9 +8,11 @@ import type { IRun, } from 'n8n-workflow'; import { deepCopy, jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; -import type { ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea'; +import type { ConnectionOptions, EventContext, Message, ReceiverOptions } from 'rhea'; import { create_container } from 'rhea'; +import type { AmqpCredential } from './types'; + export class AmqpTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'AMQP Trigger', @@ -136,7 +138,7 @@ export class AmqpTrigger implements INodeType { }; async trigger(this: ITriggerFunctions): Promise { - const credentials = await this.getCredentials('amqp'); + const credentials = await this.getCredentials('amqp'); const sink = this.getNodeParameter('sink', '') as string; const clientname = this.getNodeParameter('clientname', '') as string; @@ -146,7 +148,8 @@ export class AmqpTrigger implements INodeType { const pullMessagesNumber = (options.pullMessagesNumber as number) || 100; const containerId = options.containerId as string; const containerReconnect = (options.reconnect as boolean) || true; - const containerReconnectLimit = (options.reconnectLimit as number) || 50; + // Keep reconnecting (exponential backoff) forever unless user sets a limit + const containerReconnectLimit = (options.reconnectLimit as number) ?? undefined; if (sink === '') { throw new NodeOperationError(this.getNode(), 'Queue or Topic required!'); @@ -227,20 +230,22 @@ export class AmqpTrigger implements INodeType { }); /* - Values are documentet here: https://github.com/amqp/rhea#container + Values are documented here: https://github.com/amqp/rhea#container */ - const connectOptions: ContainerOptions = { + const connectOptions: ConnectionOptions = { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, reconnect: containerReconnect, reconnect_limit: containerReconnectLimit, + // Try reconnection even if caused by a fatal error + all_errors_non_fatal: true, username: credentials.username ? credentials.username : undefined, password: credentials.password ? credentials.password : undefined, transport: credentials.transportType ? credentials.transportType : undefined, container_id: containerId ? containerId : undefined, id: containerId ? containerId : undefined, - }; + } as unknown as ConnectionOptions; const connection = container.connect(connectOptions); const clientOptions: ReceiverOptions = { diff --git a/packages/nodes-base/nodes/Amqp/types.ts b/packages/nodes-base/nodes/Amqp/types.ts new file mode 100644 index 0000000000..f8d47d9eb2 --- /dev/null +++ b/packages/nodes-base/nodes/Amqp/types.ts @@ -0,0 +1,7 @@ +export type AmqpCredential = { + hostname: string; + port: number; + username?: string; + password?: string; + transportType?: 'tcp' | 'tls'; +}; diff --git a/packages/nodes-base/nodes/Kafka/test/KafkaTrigger.node.test.ts b/packages/nodes-base/nodes/Kafka/test/KafkaTrigger.node.test.ts index dbce69f021..ae10a2490b 100644 --- a/packages/nodes-base/nodes/Kafka/test/KafkaTrigger.node.test.ts +++ b/packages/nodes-base/nodes/Kafka/test/KafkaTrigger.node.test.ts @@ -310,7 +310,7 @@ describe('KafkaTrigger Node', () => { }); it('should handle manual trigger mode', async () => { - const { emit } = await testTriggerNode(KafkaTrigger, { + const { emit, manualTriggerFunction } = await testTriggerNode(KafkaTrigger, { mode: 'manual', node: { parameters: { @@ -330,6 +330,8 @@ describe('KafkaTrigger Node', () => { }, }); + await manualTriggerFunction?.(); + expect(mockConsumerConnect).toHaveBeenCalledTimes(1); expect(mockConsumerSubscribe).toHaveBeenCalledTimes(1); expect(mockConsumerRun).toHaveBeenCalledTimes(1); diff --git a/packages/nodes-base/nodes/Schedule/test/ScheduleTrigger.node.test.ts b/packages/nodes-base/nodes/Schedule/test/ScheduleTrigger.node.test.ts index aa81a05857..2608d63b79 100644 --- a/packages/nodes-base/nodes/Schedule/test/ScheduleTrigger.node.test.ts +++ b/packages/nodes-base/nodes/Schedule/test/ScheduleTrigger.node.test.ts @@ -108,13 +108,15 @@ describe('ScheduleTrigger', () => { }); it('should emit when manually executed', async () => { - const { emit } = await testTriggerNode(ScheduleTrigger, { + const { emit, manualTriggerFunction } = await testTriggerNode(ScheduleTrigger, { mode: 'manual', timezone, node: { parameters: { rule: { interval: [{ field: 'hours', hoursInterval: 3 }] } } }, workflowStaticData: { recurrenceRules: [] }, }); + await manualTriggerFunction?.(); + expect(emit).toHaveBeenCalledTimes(1); const firstTriggerData = emit.mock.calls[0][0][0][0]; @@ -134,25 +136,26 @@ describe('ScheduleTrigger', () => { }); it('should throw on invalid cron expressions in manual mode', async () => { - await expect( - testTriggerNode(ScheduleTrigger, { - mode: 'manual', - timezone, - node: { - parameters: { - rule: { - interval: [ - { - field: 'cronExpression', - expression: '@daily *', // adding extra fields to shorthand not allowed -> invalid - }, - ], - }, + const { manualTriggerFunction } = await testTriggerNode(ScheduleTrigger, { + mode: 'manual', + timezone, + node: { + parameters: { + rule: { + interval: [ + { + field: 'cronExpression', + expression: '@daily *', // adding extra fields to shorthand not allowed -> invalid + }, + ], }, }, - workflowStaticData: {}, - }), - ).rejects.toBeInstanceOf(n8nWorkflow.NodeOperationError); + }, + workflowStaticData: {}, + }); + await expect(manualTriggerFunction?.()).rejects.toBeInstanceOf( + n8nWorkflow.NodeOperationError, + ); }); }); }); diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 5a803f836a..fae31e64e9 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -936,7 +936,7 @@ "pyodide": "0.28.0", "redis": "4.6.14", "rfc2047": "4.0.1", - "rhea": "1.0.24", + "rhea": "3.0.4", "rrule": "2.8.1", "rss-parser": "3.13.0", "sanitize-html": "2.12.1", diff --git a/packages/nodes-base/test/nodes/TriggerHelpers.ts b/packages/nodes-base/test/nodes/TriggerHelpers.ts index 0c1a6b7304..a1f0aefaf5 100644 --- a/packages/nodes-base/test/nodes/TriggerHelpers.ts +++ b/packages/nodes-base/test/nodes/TriggerHelpers.ts @@ -120,11 +120,11 @@ export async function testTriggerNode( if (options.mode === 'manual') { expect(response?.manualTriggerFunction).toBeInstanceOf(Function); - await response?.manualTriggerFunction?.(); } return { close: jest.fn(response?.closeFunction), + manualTriggerFunction: options.mode === 'manual' ? response?.manualTriggerFunction : undefined, emit, }; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c0ba47a229..ce3f3b79f1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3009,8 +3009,8 @@ importers: specifier: 4.0.1 version: 4.0.1 rhea: - specifier: 1.0.24 - version: 1.0.24 + specifier: 3.0.4 + version: 3.0.4 rrule: specifier: 2.8.1 version: 2.8.1 @@ -14694,8 +14694,8 @@ packages: rfdc@1.3.0: resolution: {integrity: sha512-V2hovdzFbOi77/WajaSMXk2OLm+xNIeQdMMuB7icj7bk6zi2F8GGAxigcnDFpJHbNyNcgyJDiP+8nOrY5cZGrA==} - rhea@1.0.24: - resolution: {integrity: sha512-PEl62U2EhxCO5wMUZ2/bCBcXAVKN9AdMSNQOrp3+R5b77TEaOSiy16MQ0sIOmzj/iqsgIAgPs1mt3FYfu1vIXA==} + rhea@3.0.4: + resolution: {integrity: sha512-n3kw8syCdrsfJ72w3rohpoHHlmv/RZZEP9VY5BVjjo0sEGIt4YSKypBgaiA+OUSgJAzLjOECYecsclG5xbYtZw==} rimraf@2.6.3: resolution: {integrity: sha512-mwqeW5XsA2qAejG46gYdENaxXjx9onRNCfn7L0duuP4hCuTIi/QO7PDK07KJfp1d+izWPrzEJDcSqBa0OZQriA==} @@ -31397,9 +31397,9 @@ snapshots: rfdc@1.3.0: {} - rhea@1.0.24: + rhea@3.0.4: dependencies: - debug: 3.2.7(supports-color@5.5.0) + debug: 4.4.1(supports-color@8.1.1) transitivePeerDependencies: - supports-color