mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 10:02:05 +00:00
refactor(MQTT Node): Refactor, fix duplicate triggers, and add Unit tests (#9847)
Co-authored-by: Elias Meire <elias@meire.dev>
This commit is contained in:
committed by
GitHub
parent
e51de9d391
commit
164ec72c0d
@@ -1,16 +1,22 @@
|
||||
import type { ISubscriptionMap } from 'mqtt';
|
||||
import type { QoS } from 'mqtt-packet';
|
||||
import type {
|
||||
ITriggerFunctions,
|
||||
IDataObject,
|
||||
INodeType,
|
||||
INodeTypeDescription,
|
||||
ITriggerResponse,
|
||||
IDeferredPromise,
|
||||
IRun,
|
||||
} from 'n8n-workflow';
|
||||
import { NodeOperationError, randomString } from 'n8n-workflow';
|
||||
import { NodeOperationError } from 'n8n-workflow';
|
||||
|
||||
import * as mqtt from 'mqtt';
|
||||
import { formatPrivateKey } from '@utils/utilities';
|
||||
import { createClient, type MqttCredential } from './GenericFunctions';
|
||||
|
||||
interface Options {
|
||||
jsonParseBody: boolean;
|
||||
onlyMessage: boolean;
|
||||
parallelProcessing: boolean;
|
||||
}
|
||||
|
||||
export class MqttTrigger implements INodeType {
|
||||
description: INodeTypeDescription = {
|
||||
@@ -87,120 +93,64 @@ export class MqttTrigger implements INodeType {
|
||||
};
|
||||
|
||||
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
|
||||
const credentials = await this.getCredentials('mqtt');
|
||||
|
||||
const topics = (this.getNodeParameter('topics') as string).split(',');
|
||||
|
||||
const topicsQoS: IDataObject = {};
|
||||
|
||||
for (const data of topics) {
|
||||
const [topic, qos] = data.split(':');
|
||||
topicsQoS[topic] = qos ? { qos: parseInt(qos, 10) } : { qos: 0 };
|
||||
}
|
||||
|
||||
const options = this.getNodeParameter('options') as IDataObject;
|
||||
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;
|
||||
|
||||
if (!topics) {
|
||||
if (!topics?.length) {
|
||||
throw new NodeOperationError(this.getNode(), 'Topics are mandatory!');
|
||||
}
|
||||
|
||||
const protocol = (credentials.protocol as string) || 'mqtt';
|
||||
const host = credentials.host as string;
|
||||
const brokerUrl = `${protocol}://${host}`;
|
||||
const port = (credentials.port as number) || 1883;
|
||||
const clientId = (credentials.clientId as string) || `mqttjs_${randomString(8).toLowerCase()}`;
|
||||
const clean = credentials.clean as boolean;
|
||||
const ssl = credentials.ssl as boolean;
|
||||
const ca = formatPrivateKey(credentials.ca as string);
|
||||
const cert = formatPrivateKey(credentials.cert as string);
|
||||
const key = formatPrivateKey(credentials.key as string);
|
||||
const rejectUnauthorized = credentials.rejectUnauthorized as boolean;
|
||||
|
||||
let client: mqtt.MqttClient;
|
||||
|
||||
if (!ssl) {
|
||||
const clientOptions: mqtt.IClientOptions = {
|
||||
port,
|
||||
clean,
|
||||
clientId,
|
||||
};
|
||||
|
||||
if (credentials.username && credentials.password) {
|
||||
clientOptions.username = credentials.username as string;
|
||||
clientOptions.password = credentials.password as string;
|
||||
}
|
||||
|
||||
client = mqtt.connect(brokerUrl, clientOptions);
|
||||
} else {
|
||||
const clientOptions: mqtt.IClientOptions = {
|
||||
port,
|
||||
clean,
|
||||
clientId,
|
||||
ca,
|
||||
cert,
|
||||
key,
|
||||
rejectUnauthorized,
|
||||
};
|
||||
if (credentials.username && credentials.password) {
|
||||
clientOptions.username = credentials.username as string;
|
||||
clientOptions.password = credentials.password as string;
|
||||
}
|
||||
|
||||
client = mqtt.connect(brokerUrl, clientOptions);
|
||||
const topicsQoS: ISubscriptionMap = {};
|
||||
for (const data of topics) {
|
||||
const [topic, qosString] = data.split(':');
|
||||
let qos = qosString ? parseInt(qosString, 10) : 0;
|
||||
if (qos < 0 || qos > 2) qos = 0;
|
||||
topicsQoS[topic] = { qos: qos as QoS };
|
||||
}
|
||||
|
||||
const manualTriggerFunction = async () => {
|
||||
await new Promise((resolve, reject) => {
|
||||
client.on('connect', () => {
|
||||
client.subscribe(topicsQoS as mqtt.ISubscriptionMap, (error, _granted) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
client.on('message', async (topic: string, message: Buffer | string) => {
|
||||
let result: IDataObject = {};
|
||||
const options = this.getNodeParameter('options') as Options;
|
||||
const credentials = (await this.getCredentials('mqtt')) as unknown as MqttCredential;
|
||||
const client = await createClient(credentials);
|
||||
|
||||
message = message.toString();
|
||||
const parsePayload = (topic: string, payload: Buffer) => {
|
||||
let message = payload.toString();
|
||||
|
||||
if (options.jsonParseBody) {
|
||||
try {
|
||||
message = JSON.parse(message.toString());
|
||||
} catch (e) {}
|
||||
}
|
||||
if (options.jsonParseBody) {
|
||||
try {
|
||||
message = JSON.parse(message);
|
||||
} catch (e) {}
|
||||
}
|
||||
|
||||
result.message = message;
|
||||
result.topic = topic;
|
||||
let result: IDataObject = { message, topic };
|
||||
|
||||
if (options.onlyMessage) {
|
||||
//@ts-ignore
|
||||
result = [message as string];
|
||||
}
|
||||
if (options.onlyMessage) {
|
||||
//@ts-ignore
|
||||
result = [message];
|
||||
}
|
||||
|
||||
let responsePromise: IDeferredPromise<IRun> | undefined;
|
||||
if (!parallelProcessing) {
|
||||
responsePromise = await this.helpers.createDeferredPromise();
|
||||
}
|
||||
this.emit([this.helpers.returnJsonArray([result])], undefined, responsePromise);
|
||||
if (responsePromise) {
|
||||
await responsePromise.promise();
|
||||
}
|
||||
resolve(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
client.on('error', (error) => {
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
return [this.helpers.returnJsonArray([result])];
|
||||
};
|
||||
|
||||
const manualTriggerFunction = async () =>
|
||||
await new Promise<void>(async (resolve) => {
|
||||
client.once('message', (topic, payload) => {
|
||||
this.emit(parsePayload(topic, payload));
|
||||
resolve();
|
||||
});
|
||||
await client.subscribeAsync(topicsQoS);
|
||||
});
|
||||
|
||||
if (this.getMode() === 'trigger') {
|
||||
void manualTriggerFunction();
|
||||
const donePromise = !options.parallelProcessing
|
||||
? await this.helpers.createDeferredPromise<IRun>()
|
||||
: undefined;
|
||||
client.on('message', async (topic, payload) => {
|
||||
this.emit(parsePayload(topic, payload), undefined, donePromise);
|
||||
await donePromise?.promise();
|
||||
});
|
||||
await client.subscribeAsync(topicsQoS);
|
||||
}
|
||||
|
||||
async function closeFunction() {
|
||||
client.end();
|
||||
await client.endAsync();
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user