diff --git a/packages/nodes-base/nodes/MQTT/GenericFunctions.ts b/packages/nodes-base/nodes/MQTT/GenericFunctions.ts index 2500af2a16..7ff91fee55 100644 --- a/packages/nodes-base/nodes/MQTT/GenericFunctions.ts +++ b/packages/nodes-base/nodes/MQTT/GenericFunctions.ts @@ -1,5 +1,5 @@ -import { connectAsync, type IClientOptions, type MqttClient } from 'mqtt'; -import { randomString } from 'n8n-workflow'; +import { connect, type IClientOptions, type MqttClient } from 'mqtt'; +import { ApplicationError, randomString } from 'n8n-workflow'; import { formatPrivateKey } from '@utils/utilities'; interface BaseMqttCredential { @@ -49,5 +49,23 @@ export const createClient = async (credentials: MqttCredential): Promise { + const client = connect(clientOptions); + + const onConnect = () => { + client.removeListener('connect', onConnect); + // eslint-disable-next-line @typescript-eslint/no-use-before-define + client.removeListener('error', onError); + resolve(client); + }; + + const onError = (error: Error) => { + client.removeListener('connect', onConnect); + client.removeListener('error', onError); + reject(new ApplicationError(error.message)); + }; + + client.once('connect', onConnect); + client.once('error', onError); + }); };