feat: Enable parallel processing on multiple queue nodes (#6295)

* Add non-parallel execution

* Add parallel processing for MQTT

* Fix logic expression for trigger

* fixes

* remove unused import

* fix MQTT parallel processing

* fix AMQPTrigger node parallelProcessing

* MQTTTrigger node default parallelProcessing to true

* add AMQP credential test

* improve error handling

---------

Co-authored-by: Marcus <marcus@n8n.io>
This commit is contained in:
agobrech
2023-08-16 13:06:47 +02:00
committed by GitHub
parent 198a977f57
commit 44afcff959
4 changed files with 95 additions and 6 deletions

View File

@@ -4,6 +4,8 @@ import type {
INodeType,
INodeTypeDescription,
ITriggerResponse,
IDeferredPromise,
IRun,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
@@ -71,6 +73,14 @@ export class MqttTrigger implements INodeType {
default: false,
description: 'Whether to return only the message property',
},
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
description:
'Whether to process messages in parallel or by keeping the message in order',
},
],
},
],
@@ -89,6 +99,7 @@ export class MqttTrigger implements INodeType {
}
const options = this.getNodeParameter('options') as IDataObject;
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;
if (!topics) {
throw new NodeOperationError(this.getNode(), 'Topics are mandatory!');
@@ -147,7 +158,7 @@ export class MqttTrigger implements INodeType {
if (error) {
reject(error);
}
client.on('message', (topic: string, message: Buffer | string) => {
client.on('message', async (topic: string, message: Buffer | string) => {
let result: IDataObject = {};
message = message.toString();
@@ -165,7 +176,15 @@ export class MqttTrigger implements INodeType {
//@ts-ignore
result = [message as string];
}
this.emit([this.helpers.returnJsonArray(result)]);
let responsePromise: IDeferredPromise<IRun> | undefined;
if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise();
}
this.emit([this.helpers.returnJsonArray([result])], undefined, responsePromise);
if (responsePromise) {
await responsePromise.promise();
}
resolve(true);
});
});