diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 3fd8133711..653d0dff92 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,4 +1,4 @@ -import { ContainerOptions, Delivery } from 'rhea'; +import { ContainerOptions, Delivery, Dictionary, EventContext } from 'rhea'; import { IExecuteFunctions } from 'n8n-core'; import { @@ -64,6 +64,27 @@ export class Amqp implements INodeType { default: '', description: 'The only property to send. If empty the whole item will be sent.', }, + { + displayName: 'Container ID', + name: 'containerID', + type: 'string', + default: '', + description: 'Will be used to pass to the RHEA Backend as container_id', + }, + { + displayName: 'Reconnect', + name: 'reconnect', + type: 'boolean', + default: true, + description: 'If on, the library will automatically attempt to reconnect if disconnected', + }, + { + displayName: 'Reconnect limit', + name: 'reconnectLimit', + type: 'number', + default: 50, + description: 'maximum number of reconnect attempts', + }, ], }, ], @@ -78,11 +99,16 @@ export class Amqp implements INodeType { const sink = this.getNodeParameter('sink', 0, '') as string; const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as string | object; const options = this.getNodeParameter('options', 0, {}) as IDataObject; + const container_id = options.containerID as string; + const containerReconnect = options.reconnect as boolean || true ; + const containerReconnectLimit = options.reconnectLimit as number || 50; - let headerProperties = applicationProperties; + let headerProperties : Dictionary; if (typeof applicationProperties === 'string' && applicationProperties !== '') { headerProperties = JSON.parse(applicationProperties); - } + } else { + headerProperties = applicationProperties as object; + } if (sink === '') { throw new Error('Queue or Topic required!'); @@ -90,28 +116,27 @@ export class Amqp implements INodeType { const container = require('rhea'); + /* + Values are documentet here: https://github.com/amqp/rhea#container + */ const connectOptions: ContainerOptions = { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, - reconnect: true, // this id the default anyway - reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm + reconnect: containerReconnect, + reconnect_limit: containerReconnectLimit, + username: credentials.username ? credentials.username : undefined, + password: credentials.password ? credentials.password : undefined, + transport: credentials.transportType ? credentials.transportType : undefined, + container_id: container_id ? container_id : undefined, + id: container_id ? container_id : undefined, }; - if (credentials.username || credentials.password) { - container.options.username = credentials.username; - container.options.password = credentials.password; - connectOptions.username = credentials.username; - connectOptions.password = credentials.password; - } - if (credentials.transportType !== '') { - connectOptions.transport = credentials.transportType; - } - const conn = container.connect(connectOptions); + const sender = conn.open_sender(sink); const responseData: IDataObject[] = await new Promise((resolve) => { - container.once('sendable', (context: any) => { // tslint:disable-line:no-any + container.once('sendable', (context: EventContext) => { const returnData = []; const items = this.getInputData(); @@ -129,12 +154,12 @@ export class Amqp implements INodeType { body = JSON.stringify(body); } - const result = context.sender.send({ + const result = context.sender?.send({ application_properties: headerProperties, body, }); - returnData.push({ id: result.id }); + returnData.push({ id: result?.id }); } resolve(returnData);