feat(RabbitMQ Trigger Node): Make message acknowledgement and parallel processing configurable (#3385)

* feat(RabbitMQ Trigger Node): Make message acknowledgement and concurrent
processing configurable

*  Make sure that messages do not get executed multiple times

* 👕 Fix lint issue

* 🐛 Fix issue that for manual executions in "own" mode messages got
know acknowledged

*  Increment count now that console.log got removed

*  Improvements

*  Fix default value

*  Improve display name
This commit is contained in:
Jan Oberhauser
2022-05-30 12:16:44 +02:00
committed by GitHub
parent d7c6833dc3
commit b851289001
9 changed files with 336 additions and 72 deletions

View File

@@ -1,11 +1,14 @@
import {
createDeferredPromise,
IDataObject,
INodeExecutionData,
INodeProperties,
INodeType,
INodeTypeDescription,
IRun,
ITriggerFunctions,
ITriggerResponse,
LoggerProxy as Logger,
} from 'n8n-workflow';
import {
@@ -13,9 +16,12 @@ import {
} from './DefaultOptions';
import {
MessageTracker,
rabbitmqConnectQueue,
} from './GenericFunctions';
import * as amqplib from 'amqplib';
export class RabbitMQTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'RabbitMQ Trigger',
@@ -42,7 +48,7 @@ export class RabbitMQTrigger implements INodeType {
type: 'string',
default: '',
placeholder: 'queue-name',
description: 'Name of the queue to publish to',
description: 'The name of the queue to read from',
},
{
@@ -59,6 +65,30 @@ export class RabbitMQTrigger implements INodeType {
default: false,
description: 'Saves the content as binary',
},
{
displayName: 'Delete from queue when',
name: 'acknowledge',
type: 'options',
options: [
{
name: 'Execution finishes',
value: 'executionFinishes',
description: 'After the workflow execution finished. No matter if the execution was successful or not.',
},
{
name: 'Execution finishes successfully',
value: 'executionFinishesSuccessfully',
description: 'After the workflow execution finished successfully',
},
{
name: 'Immediately',
value: 'immediately',
description: 'As soon as the message got received',
},
],
default: 'immediately',
description: 'When to acknowledge the message',
},
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
@@ -87,6 +117,21 @@ export class RabbitMQTrigger implements INodeType {
default: false,
description: 'Returns only the content property',
},
// eslint-disable-next-line n8n-nodes-base/node-param-default-missing
{
displayName: 'Parallel message processing limit',
name: 'parallelMessages',
type: 'number',
default: -1,
displayOptions: {
hide: {
acknowledge: [
'immediately',
],
},
},
description: 'Max number of executions at a time. Use -1 for no limit.',
},
...rabbitDefaultOptions,
].sort((a, b) => {
if ((a as INodeProperties).displayName.toLowerCase() < (b as INodeProperties).displayName.toLowerCase()) { return -1; }
@@ -106,42 +151,117 @@ export class RabbitMQTrigger implements INodeType {
const self = this;
let parallelMessages = (options.parallelMessages !== undefined && options.parallelMessages !== -1) ? parseInt(options.parallelMessages as string, 10) : -1;
if (parallelMessages === 0 || parallelMessages < -1) {
throw new Error('Parallel message processing limit must be greater than zero (or -1 for no limit)');
}
if (this.getMode() === 'manual') {
// Do only catch a single message when executing manually, else messages will leak
parallelMessages = 1;
}
let acknowledgeMode = options.acknowledge ? options.acknowledge : 'immediately';
if (parallelMessages !== -1 && acknowledgeMode === 'immediately') {
// If parallel message limit is set, then the default mode is "executionFinishes"
// unless acknowledgeMode got set specifically. Be aware that the mode "immediately"
// can not be supported in this case.
acknowledgeMode = 'executionFinishes';
}
const messageTracker = new MessageTracker();
let consumerTag: string;
const startConsumer = async () => {
await channel.consume(queue, async (message: IDataObject) => {
if (parallelMessages !== -1) {
channel.prefetch(parallelMessages);
}
const consumerInfo = await channel.consume(queue, async (message) => {
if (message !== null) {
let content: IDataObject | string = message!.content!.toString();
const item: INodeExecutionData = {
json: {},
};
try {
if (acknowledgeMode !== 'immediately') {
messageTracker.received(message);
}
if (options.contentIsBinary === true) {
item.binary = {
data: await this.helpers.prepareBinaryData(message.content),
let content: IDataObject | string = message!.content!.toString();
const item: INodeExecutionData = {
json: {},
};
item.json = message;
message.content = undefined;
} else {
if (options.jsonParseBody === true) {
content = JSON.parse(content as string);
}
if (options.onlyContent === true) {
item.json = content as IDataObject;
} else {
message.content = content;
item.json = message;
}
}
if (options.contentIsBinary === true) {
item.binary = {
data: await this.helpers.prepareBinaryData(message.content),
};
self.emit([
[
item,
],
]);
channel.ack(message);
item.json = message as unknown as IDataObject;
message.content = undefined as unknown as Buffer;
} else {
if (options.jsonParseBody === true) {
content = JSON.parse(content as string);
}
if (options.onlyContent === true) {
item.json = content as IDataObject;
} else {
message.content = content as unknown as Buffer;
item.json = message as unknown as IDataObject;
}
}
let responsePromise = undefined;
if (acknowledgeMode !== 'immediately') {
responsePromise = await createDeferredPromise<IRun>();
}
self.emit([
[
item,
],
], undefined, responsePromise);
if (responsePromise) {
// Acknowledge message after the execution finished
await responsePromise
.promise()
.then(async (data: IRun) => {
if (data.data.resultData.error) {
// The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') {
channel.nack(message);
messageTracker.answered(message);
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else {
// Acknowledge message directly
channel.ack(message);
}
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
if (acknowledgeMode !== 'immediately') {
messageTracker.answered(message);
}
Logger.error(`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
}
});
consumerTag = consumerInfo.consumerTag;
};
startConsumer();
@@ -149,23 +269,23 @@ export class RabbitMQTrigger implements INodeType {
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await channel.close();
await channel.connection.close();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
async function manualTriggerFunction() {
startConsumer();
try {
return messageTracker.closeChannel(channel, consumerTag);
} catch(error) {
const workflow = self.getWorkflow();
const node = self.getNode();
Logger.error(`There was a problem closing the RabbitMQ Trigger node connection "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
}
return {
closeFunction,
manualTriggerFunction,
};
}