Additional improvements to AMQP node

This commit is contained in:
Jan Oberhauser
2020-11-11 08:45:31 +01:00
parent da5fddad5a
commit 702a8bf3bf
2 changed files with 71 additions and 70 deletions

View File

@@ -1,6 +1,6 @@
import { ContainerOptions, Delivery } from 'rhea';
import { IExecuteSingleFunctions } from 'n8n-core';
import { IExecuteFunctions } from 'n8n-core';
import {
IDataObject,
INodeExecutionData,
@@ -66,20 +66,18 @@ export class Amqp implements INodeType {
},
],
},
]
],
};
async executeSingle(this: IExecuteSingleFunctions): Promise<INodeExecutionData> {
const item = this.getInputData();
async execute(this: IExecuteFunctions): Promise < INodeExecutionData[][] > {
const credentials = this.getCredentials('amqp');
if (!credentials) {
throw new Error('Credentials are mandatory!');
}
const sink = this.getNodeParameter('sink', '') as string;
const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object;
const options = this.getNodeParameter('options', {}) as IDataObject;
const sink = this.getNodeParameter('sink', 0, '') as string;
const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as string | object;
const options = this.getNodeParameter('options', 0, {}) as IDataObject;
let headerProperties = applicationProperties;
if (typeof applicationProperties === 'string' && applicationProperties !== '') {
@@ -109,39 +107,43 @@ export class Amqp implements INodeType {
connectOptions.transport = credentials.transportType;
}
const allSent = new Promise(( resolve ) => {
container.once('sendable', (context: any) => { // tslint:disable-line:no-any
let body: IDataObject | string = item.json;
const sendOnlyProperty = options.sendOnlyProperty as string;
if (sendOnlyProperty) {
body = body[sendOnlyProperty] as string;
}
if (options.dataAsObject !== true) {
body = JSON.stringify(body);
}
const message = {
application_properties: headerProperties,
body
};
const sendResult = context.sender.send(message);
resolve(sendResult);
});
});
const conn = container.connect(connectOptions);
const sender = conn.open_sender(sink);
const sendResult: Delivery = await allSent as Delivery; // sendResult has a a property that causes circular reference if returned
const responseData: IDataObject[] = await new Promise((resolve) => {
container.once('sendable', (context: any) => { // tslint:disable-line:no-any
const returnData = [];
const items = this.getInputData();
for (let i = 0; i < items.length; i++) {
const item = items[i];
let body: IDataObject | string = item.json;
const sendOnlyProperty = options.sendOnlyProperty as string;
if (sendOnlyProperty) {
body = body[sendOnlyProperty] as string;
}
if (options.dataAsObject !== true) {
body = JSON.stringify(body);
}
const result = context.sender.send({
application_properties: headerProperties,
body,
});
returnData.push({ id: result.id });
}
resolve(returnData);
});
});
sender.close();
conn.close();
return { json: { id: sendResult.id } } as INodeExecutionData;
return [this.helpers.returnJsonArray(responseData)];
}
}