Add exchange to RabbitMQ (#1328)

* Implement exchange for rabbitmq

* Fix options not parsing displayOptions

* Cleanup code in generic functions

*  Minor changes to (#1300)

Co-authored-by: Pieter Jong <jongpieter@hotmail.com>
This commit is contained in:
Ricardo Espinoza
2021-01-13 02:57:06 -05:00
committed by GitHub
parent 0638f9624d
commit ea873aa8dc
3 changed files with 319 additions and 57 deletions

View File

@@ -6,7 +6,7 @@ import {
const amqplib = require('amqplib');
export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
const credentials = this.getCredentials('rabbitmq') as IDataObject;
const credentialKeys = [
@@ -16,6 +16,7 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction
'password',
'vhost',
];
const credentialData: IDataObject = {};
credentialKeys.forEach(key => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
@@ -51,12 +52,36 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction
options.arguments = additionalArguments;
}
await channel.assertQueue(queue, options);
resolve(channel);
} catch (error) {
reject(error);
}
});
}
export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
const channel = await rabbitmqConnect.call(this, options);
return new Promise(async (resolve, reject) => {
try {
await channel.assertQueue(queue, options);
resolve(channel);
} catch (error) {
reject(error);
}
});
}
export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
const channel = await rabbitmqConnect.call(this, options);
return new Promise(async (resolve, reject) => {
try {
await channel.assertExchange(exchange, type, options);
resolve(channel);
} catch (error) {
reject(error);
}
});
}