Minor improvements to RabbitMQ headers

This commit is contained in:
Jan Oberhauser
2021-04-04 10:33:53 +02:00
parent 51f066cbcd
commit 55c67d8a20

View File

@@ -177,6 +177,20 @@ export class RabbitMQ implements INodeType {
default: {}, default: {},
placeholder: 'Add Option', placeholder: 'Add Option',
options: [ options: [
{
displayName: 'Alternate Exchange',
name: 'alternateExchange',
type: 'string',
displayOptions: {
show: {
'/mode': [
'exchange',
],
},
},
default: '',
description: 'An exchange to send messages to if this exchange cant route them to any queues',
},
{ {
displayName: 'Arguments', displayName: 'Arguments',
name: 'arguments', name: 'arguments',
@@ -208,37 +222,6 @@ export class RabbitMQ implements INodeType {
}, },
], ],
}, },
{
displayName: 'Headers',
name: 'headers',
placeholder: 'Add Header',
description: 'Headers to add.',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'header',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{ {
displayName: 'Auto Delete', displayName: 'Auto Delete',
name: 'autoDelete', name: 'autoDelete',
@@ -268,18 +251,35 @@ export class RabbitMQ implements INodeType {
description: 'Scopes the queue to the connection.', description: 'Scopes the queue to the connection.',
}, },
{ {
displayName: 'Alternate Exchange', displayName: 'Headers',
name: 'alternateExchange', name: 'headers',
type: 'string', placeholder: 'Add Header',
displayOptions: { description: 'Headers to add.',
show: { type: 'fixedCollection',
'/mode': [ typeOptions: {
'exchange', multipleValues: true,
},
default: {},
options: [
{
name: 'header',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
], ],
}, },
}, ],
default: '',
description: 'An exchange to send messages to if this exchange cant route them to any queues',
}, },
], ],
}, },
@@ -287,7 +287,7 @@ export class RabbitMQ implements INodeType {
}; };
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let channel; let channel, options: IDataObject;
try { try {
const items = this.getInputData(); const items = this.getInputData();
const mode = this.getNodeParameter('mode', 0) as string; const mode = this.getNodeParameter('mode', 0) as string;
@@ -297,7 +297,7 @@ export class RabbitMQ implements INodeType {
if (mode === 'queue') { if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string; const queue = this.getNodeParameter('queue', 0) as string;
const options = this.getNodeParameter('options', 0, {}) as IDataObject; options = this.getNodeParameter('options', 0, {}) as IDataObject;
channel = await rabbitmqConnectQueue.call(this, queue, options); channel = await rabbitmqConnectQueue.call(this, queue, options);
@@ -313,7 +313,17 @@ export class RabbitMQ implements INodeType {
message = this.getNodeParameter('message', i) as string; message = this.getNodeParameter('message', i) as string;
} }
queuePromises.push(channel.sendToQueue(queue, Buffer.from(message))); let headers: IDataObject = {};
if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) {
let itemOptions = this.getNodeParameter('options', i, {}) as IDataObject;
const additionalHeaders: IDataObject = {};
((itemOptions.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
});
headers = additionalHeaders;
}
queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers }));
} }
// @ts-ignore // @ts-ignore
@@ -351,16 +361,7 @@ export class RabbitMQ implements INodeType {
const type = this.getNodeParameter('exchangeType', 0) as string; const type = this.getNodeParameter('exchangeType', 0) as string;
const routingKey = this.getNodeParameter('routingKey', 0) as string; const routingKey = this.getNodeParameter('routingKey', 0) as string;
const options = this.getNodeParameter('options', 0, {}) as IDataObject; options = this.getNodeParameter('options', 0, {}) as IDataObject;
let headers : IDataObject = {};
if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) {
const additionalHeaders: IDataObject = {};
((options.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
});
headers = additionalHeaders;
}
channel = await rabbitmqConnectExchange.call(this, exchange, type, options); channel = await rabbitmqConnectExchange.call(this, exchange, type, options);
@@ -376,7 +377,17 @@ export class RabbitMQ implements INodeType {
message = this.getNodeParameter('message', i) as string; message = this.getNodeParameter('message', i) as string;
} }
exchangePromises.push(channel.publish(exchange, routingKey, Buffer.from(message), {headers})); let headers: IDataObject = {};
if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) {
let itemOptions = this.getNodeParameter('options', i, {}) as IDataObject;
const additionalHeaders: IDataObject = {};
((itemOptions.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
});
headers = additionalHeaders;
}
exchangePromises.push(channel.publish(exchange, routingKey, Buffer.from(message), { headers }));
} }
// @ts-ignore // @ts-ignore