diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts index 3b9cd60c6f..c1b332fe18 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -177,6 +177,20 @@ export class RabbitMQ implements INodeType { default: {}, placeholder: 'Add Option', options: [ + { + displayName: 'Alternate Exchange', + name: 'alternateExchange', + type: 'string', + displayOptions: { + show: { + '/mode': [ + 'exchange', + ], + }, + }, + default: '', + description: 'An exchange to send messages to if this exchange can’t route them to any queues', + }, { displayName: 'Arguments', name: 'arguments', @@ -208,37 +222,6 @@ export class RabbitMQ implements INodeType { }, ], }, - { - displayName: 'Headers', - name: 'headers', - placeholder: 'Add Header', - description: 'Headers to add.', - type: 'fixedCollection', - typeOptions: { - multipleValues: true, - }, - default: {}, - options: [ - { - name: 'header', - displayName: 'Header', - values: [ - { - displayName: 'Key', - name: 'key', - type: 'string', - default: '', - }, - { - displayName: 'Value', - name: 'value', - type: 'string', - default: '', - }, - ], - }, - ], - }, { displayName: 'Auto Delete', name: 'autoDelete', @@ -268,18 +251,35 @@ export class RabbitMQ implements INodeType { description: 'Scopes the queue to the connection.', }, { - displayName: 'Alternate Exchange', - name: 'alternateExchange', - type: 'string', - displayOptions: { - show: { - '/mode': [ - 'exchange', + displayName: 'Headers', + name: 'headers', + placeholder: 'Add Header', + description: 'Headers to add.', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + default: {}, + options: [ + { + name: 'header', + displayName: 'Header', + values: [ + { + displayName: 'Key', + name: 'key', + type: 'string', + default: '', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, ], }, - }, - default: '', - description: 'An exchange to send messages to if this exchange can’t route them to any queues', + ], }, ], }, @@ -287,7 +287,7 @@ export class RabbitMQ implements INodeType { }; async execute(this: IExecuteFunctions): Promise { - let channel; + let channel, options: IDataObject; try { const items = this.getInputData(); const mode = this.getNodeParameter('mode', 0) as string; @@ -297,7 +297,7 @@ export class RabbitMQ implements INodeType { if (mode === 'queue') { const queue = this.getNodeParameter('queue', 0) as string; - const options = this.getNodeParameter('options', 0, {}) as IDataObject; + options = this.getNodeParameter('options', 0, {}) as IDataObject; channel = await rabbitmqConnectQueue.call(this, queue, options); @@ -313,7 +313,17 @@ export class RabbitMQ implements INodeType { message = this.getNodeParameter('message', i) as string; } - queuePromises.push(channel.sendToQueue(queue, Buffer.from(message))); + let headers: IDataObject = {}; + if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) { + let itemOptions = this.getNodeParameter('options', i, {}) as IDataObject; + const additionalHeaders: IDataObject = {}; + ((itemOptions.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => { + additionalHeaders[header.key as string] = header.value; + }); + headers = additionalHeaders; + } + + queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers })); } // @ts-ignore @@ -351,16 +361,7 @@ export class RabbitMQ implements INodeType { const type = this.getNodeParameter('exchangeType', 0) as string; const routingKey = this.getNodeParameter('routingKey', 0) as string; - const options = this.getNodeParameter('options', 0, {}) as IDataObject; - - let headers : IDataObject = {}; - if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) { - const additionalHeaders: IDataObject = {}; - ((options.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => { - additionalHeaders[header.key as string] = header.value; - }); - headers = additionalHeaders; - } + options = this.getNodeParameter('options', 0, {}) as IDataObject; channel = await rabbitmqConnectExchange.call(this, exchange, type, options); @@ -376,7 +377,17 @@ export class RabbitMQ implements INodeType { message = this.getNodeParameter('message', i) as string; } - exchangePromises.push(channel.publish(exchange, routingKey, Buffer.from(message), {headers})); + let headers: IDataObject = {}; + if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) { + let itemOptions = this.getNodeParameter('options', i, {}) as IDataObject; + const additionalHeaders: IDataObject = {}; + ((itemOptions.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => { + additionalHeaders[header.key as string] = header.value; + }); + headers = additionalHeaders; + } + + exchangePromises.push(channel.publish(exchange, routingKey, Buffer.from(message), { headers })); } // @ts-ignore