refactor(RabbitMQ Trigger Node): Improve type-safety, add tests, and fix issues with manual triggers (#10663)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-09-05 08:11:38 +02:00
committed by GitHub
parent a5a92ec8b1
commit e50f0e6a4e
6 changed files with 470 additions and 215 deletions

View File

@@ -1,9 +1,8 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
import type { Message } from 'amqplib';
import type {
IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
INodeExecutionData,
INodeProperties,
INodeType,
INodeTypeDescription,
@@ -15,7 +14,8 @@ import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { rabbitDefaultOptions } from './DefaultOptions';
import { MessageTracker, rabbitmqConnectQueue } from './GenericFunctions';
import { MessageTracker, rabbitmqConnectQueue, parseMessage } from './GenericFunctions';
import type { TriggerOptions } from './types';
export class RabbitMQTrigger implements INodeType {
description: INodeTypeDescription = {
@@ -205,28 +205,50 @@ export class RabbitMQTrigger implements INodeType {
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const queue = this.getNodeParameter('queue') as string;
const options = this.getNodeParameter('options', {}) as IDataObject;
const options = this.getNodeParameter('options', {}) as TriggerOptions;
const channel = await rabbitmqConnectQueue.call(this, queue, options);
let parallelMessages =
options.parallelMessages !== undefined && options.parallelMessages !== -1
? parseInt(options.parallelMessages as string, 10)
: -1;
if (this.getMode() === 'manual') {
const manualTriggerFunction = async () => {
// Do only catch a single message when executing manually, else messages will leak
await channel.prefetch(1);
if (parallelMessages === 0 || parallelMessages < -1) {
const processMessage = async (message: Message | null) => {
if (message !== null) {
const item = await parseMessage(message, options, this.helpers);
channel.ack(message);
this.emit([[item]]);
} else {
this.emitError(new Error('Connection got closed unexpectedly'));
}
};
const existingMessage = await channel.get(queue);
if (existingMessage) await processMessage(existingMessage);
else await channel.consume(queue, processMessage);
};
const closeFunction = async () => {
await channel.close();
await channel.connection.close();
return;
};
return {
closeFunction,
manualTriggerFunction,
};
}
const parallelMessages = options.parallelMessages ?? -1;
if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) {
throw new NodeOperationError(
this.getNode(),
'Parallel message processing limit must be greater than zero (or -1 for no limit)',
'Parallel message processing limit must be a number greater than zero (or -1 for no limit)',
);
}
if (this.getMode() === 'manual') {
// Do only catch a single message when executing manually, else messages will leak
parallelMessages = 1;
}
let acknowledgeMode = options.acknowledge ? options.acknowledge : 'immediately';
let acknowledgeMode = options.acknowledge ?? 'immediately';
if (parallelMessages !== -1 && acknowledgeMode === 'immediately') {
// If parallel message limit is set, then the default mode is "executionFinishes"
@@ -236,108 +258,82 @@ export class RabbitMQTrigger implements INodeType {
}
const messageTracker = new MessageTracker();
let consumerTag: string;
let closeGotCalled = false;
const startConsumer = async () => {
if (parallelMessages !== -1) {
await channel.prefetch(parallelMessages);
if (parallelMessages !== -1) {
await channel.prefetch(parallelMessages);
}
channel.on('close', () => {
if (!closeGotCalled) {
this.emitError(new Error('Connection got closed unexpectedly'));
}
});
channel.on('close', () => {
if (!closeGotCalled) {
this.emitError(new Error('Connection got closed unexpectedly'));
}
});
const consumerInfo = await channel.consume(queue, async (message) => {
if (message !== null) {
try {
if (acknowledgeMode !== 'immediately') {
messageTracker.received(message);
}
let content: IDataObject | string = message.content.toString();
const item: INodeExecutionData = {
json: {},
};
if (options.contentIsBinary === true) {
item.binary = {
data: await this.helpers.prepareBinaryData(message.content),
};
item.json = message as unknown as IDataObject;
message.content = undefined as unknown as Buffer;
} else {
if (options.jsonParseBody === true) {
content = JSON.parse(content);
}
if (options.onlyContent === true) {
item.json = content as IDataObject;
} else {
message.content = content as unknown as Buffer;
item.json = message as unknown as IDataObject;
}
}
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = await this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook =
await this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise().then(async (data: IRun) => {
if (data.data.resultData.error) {
// The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') {
channel.nack(message);
messageTracker.answered(message);
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise().then(() => {
channel.ack(message);
messageTracker.answered(message);
});
} else {
// Acknowledge message directly
channel.ack(message);
}
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
if (acknowledgeMode !== 'immediately') {
messageTracker.answered(message);
}
this.logger.error(
`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
const consumerInfo = await channel.consume(queue, async (message) => {
if (message !== null) {
try {
if (acknowledgeMode !== 'immediately') {
messageTracker.received(message);
}
const item = await parseMessage(message, options, this.helpers);
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = await this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook =
await this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise().then(async (data: IRun) => {
if (data.data.resultData.error) {
// The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') {
channel.nack(message);
messageTracker.answered(message);
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise().then(() => {
channel.ack(message);
messageTracker.answered(message);
});
} else {
// Acknowledge message directly
channel.ack(message);
}
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
if (acknowledgeMode !== 'immediately') {
messageTracker.answered(message);
}
this.logger.error(
`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
});
consumerTag = consumerInfo.consumerTag;
};
await startConsumer();
}
});
const consumerTag = consumerInfo.consumerTag;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.