diff --git a/packages/nodes-base/credentials/Mqtt.credentials.ts b/packages/nodes-base/credentials/Mqtt.credentials.ts new file mode 100644 index 0000000000..bcc419a524 --- /dev/null +++ b/packages/nodes-base/credentials/Mqtt.credentials.ts @@ -0,0 +1,59 @@ +import { + ICredentialType, + NodePropertyTypes, +} from 'n8n-workflow'; + + +export class Mqtt implements ICredentialType { + name = 'mqtt'; + displayName = 'MQTT'; + properties = [ + // The credentials to get from user and save encrypted. + // Properties can be defined exactly in the same way + // as node properties. + { + displayName: 'Protocol', + name: 'protocol', + type: 'options' as NodePropertyTypes, + options: [ + { + name: 'mqtt', + value: 'mqtt', + }, + { + name: 'ws', + value: 'ws', + }, + ], + default: 'mqtt', + }, + { + displayName: 'Host', + name: 'host', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'Port', + name: 'port', + type: 'number' as NodePropertyTypes, + default: 1883, + }, + { + displayName: 'Username', + name: 'username', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'Password', + name: 'password', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + default: '', + }, + ]; +} + diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts new file mode 100644 index 0000000000..0344f290b3 --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -0,0 +1,158 @@ +import { + ITriggerFunctions, +} from 'n8n-core'; + +import { + INodeType, + INodeTypeDescription, + ITriggerResponse, + IDataObject, +} from 'n8n-workflow'; + +import * as mqtt from 'mqtt'; + +import { + IClientOptions, +} from 'mqtt'; + +export class MqttTrigger implements INodeType { + description: INodeTypeDescription = { + displayName: 'MQTT Trigger', + name: 'mqttTrigger', + icon: 'file:mqtt.png', + group: ['trigger'], + version: 1, + description: 'Listens to MQTT events', + defaults: { + name: 'MQTT Trigger', + color: '#9b27af', + }, + inputs: [], + outputs: ['main'], + credentials: [ + { + name: 'mqtt', + required: true, + }, + ], + properties: [ + { + displayName: 'Topics', + name: 'topics', + type: 'string', + default: '', + description: `Topics to subscribe to, multiple can be defined with comma.
+ wildcard characters are supported (+ - for single level and # - for multi level)`, + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'Only Message', + name: 'onlyMessage', + type: 'boolean', + default: false, + description: 'Returns only the message property.', + }, + { + displayName: 'JSON Parse Message', + name: 'jsonParseMessage', + type: 'boolean', + default: false, + description: 'Try to parse the message to an object.', + }, + ], + }, + ], + }; + + async trigger(this: ITriggerFunctions): Promise { + + const credentials = this.getCredentials('mqtt'); + + if (!credentials) { + throw new Error('Credentials are mandatory!'); + } + + const topics = (this.getNodeParameter('topics') as string).split(','); + + const options = this.getNodeParameter('options') as IDataObject; + + if (!topics) { + throw new Error('Topics are mandatory!'); + } + + const protocol = credentials.protocol as string || 'mqtt'; + const host = credentials.host as string; + const brokerUrl = `${protocol}://${host}`; + const port = credentials.port as number || 1883; + + const clientOptions: IClientOptions = { + port, + }; + + if (credentials.username && credentials.password) { + clientOptions.username = credentials.username as string; + clientOptions.password = credentials.password as string; + } + + const client = mqtt.connect(brokerUrl, clientOptions); + + const self = this; + + async function manualTriggerFunction() { + await new Promise(( resolve, reject ) => { + client.on('connect', () => { + client.subscribe(topics, (err, granted) => { + if (err) { + reject(err); + } + client.on('message', (topic: string, message: Buffer | string) => { // tslint:disable-line:no-any + + let result: IDataObject = {}; + + message = message.toString() as string; + + if (options.jsonParseMessage) { + try { + message = JSON.parse(message.toString()); + } catch (err) {} + } + + result.message = message; + result.topic = topic; + + if (options.onlyMessage) { + //@ts-ignore + result = message; + } + + self.emit([self.helpers.returnJsonArray([result], + )]); + resolve(true); + }); + }); + }); + + client.on('error', (error) => { + reject(error); + }); + }); + } + + manualTriggerFunction(); + + async function closeFunction() { + client.end(); + } + + return { + closeFunction, + manualTriggerFunction, + }; + } +} diff --git a/packages/nodes-base/nodes/MQTT/mqtt.png b/packages/nodes-base/nodes/MQTT/mqtt.png new file mode 100644 index 0000000000..12c5f24952 Binary files /dev/null and b/packages/nodes-base/nodes/MQTT/mqtt.png differ diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index baa3ed6483..f8b3b6db92 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -115,6 +115,7 @@ "dist/credentials/MoceanApi.credentials.js", "dist/credentials/MondayComApi.credentials.js", "dist/credentials/MongoDb.credentials.js", + "dist/credentials/Mqtt.credentials.js", "dist/credentials/Msg91Api.credentials.js", "dist/credentials/MySql.credentials.js", "dist/credentials/NextCloudApi.credentials.js", @@ -288,6 +289,7 @@ "dist/nodes/Mocean/Mocean.node.js", "dist/nodes/MondayCom/MondayCom.node.js", "dist/nodes/MongoDb/MongoDb.node.js", + "dist/nodes/MQTT/MqttTrigger.node.js", "dist/nodes/MoveBinaryData.node.js", "dist/nodes/Msg91/Msg91.node.js", "dist/nodes/MySql/MySql.node.js", @@ -373,6 +375,7 @@ "@types/mailparser": "^2.7.3", "@types/moment-timezone": "^0.5.12", "@types/mongodb": "^3.5.4", + "@types/mqtt": "^2.5.0", "@types/mssql": "^6.0.2", "@types/node": "^14.0.27", "@types/nodemailer": "^6.4.0", @@ -409,6 +412,7 @@ "moment": "2.24.0", "moment-timezone": "^0.5.28", "mongodb": "^3.5.5", + "mqtt": "^4.2.0", "mssql": "^6.2.0", "mysql2": "^2.0.1", "n8n-core": "~0.43.0",