diff --git a/packages/nodes-base/credentials/Kafka.credentials.ts b/packages/nodes-base/credentials/Kafka.credentials.ts
index 8d1b817d21..a27f008589 100644
--- a/packages/nodes-base/credentials/Kafka.credentials.ts
+++ b/packages/nodes-base/credentials/Kafka.credentials.ts
@@ -11,6 +11,7 @@ export class Kafka implements ICredentialType {
type: 'string',
default: '',
placeholder: 'my-app',
+ hint: 'Will not affect the connection, but will be used to identify the client in the Kafka server logs. Read more here',
},
{
displayName: 'Brokers',
diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts
index 96080587d7..15f3327367 100644
--- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts
+++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts
@@ -11,7 +11,11 @@ import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { IExecuteFunctions } from 'n8n-core';
import {
+ ICredentialDataDecryptedObject,
+ ICredentialsDecrypted,
+ ICredentialTestFunctions,
IDataObject,
+ INodeCredentialTestResult,
INodeExecutionData,
INodeType,
INodeTypeDescription,
@@ -35,6 +39,7 @@ export class Kafka implements INodeType {
{
name: 'kafka',
required: true,
+ testedBy: 'kafkaConnectionTest',
},
],
properties: [
@@ -185,6 +190,56 @@ export class Kafka implements INodeType {
],
};
+ methods = {
+ credentialTest: {
+ async kafkaConnectionTest(
+ this: ICredentialTestFunctions,
+ credential: ICredentialsDecrypted,
+ ): Promise {
+ const credentials = credential.data as ICredentialDataDecryptedObject;
+ try {
+ const brokers = ((credentials.brokers as string) || '')
+ .split(',')
+ .map((item) => item.trim()) as string[];
+
+ const clientId = credentials.clientId as string;
+
+ const ssl = credentials.ssl as boolean;
+
+ const config: KafkaConfig = {
+ clientId,
+ brokers,
+ ssl,
+ };
+ if (credentials.authentication === true) {
+ if (!(credentials.username && credentials.password)) {
+ throw Error('Username and password are required for authentication');
+ }
+ config.sasl = {
+ username: credentials.username as string,
+ password: credentials.password as string,
+ mechanism: credentials.saslMechanism as string,
+ } as SASLOptions;
+ }
+
+ const kafka = new apacheKafka(config);
+
+ await kafka.admin().connect();
+ await kafka.admin().disconnect();
+ return {
+ status: 'OK',
+ message: 'Authentication successful',
+ };
+ } catch (error) {
+ return {
+ status: 'Error',
+ message: error.message,
+ };
+ }
+ },
+ },
+ };
+
async execute(this: IExecuteFunctions): Promise {
const items = this.getInputData();
diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
index 69e68a93f3..dcde8ca63a 100644
--- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
+++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
@@ -114,7 +114,7 @@ export class KafkaTrigger implements INodeType {
displayName: 'Max Number of Requests',
name: 'maxInFlightRequests',
type: 'number',
- default: 0,
+ default: 1,
description:
'Max number of requests that may be in progress at any time. If falsey then no limit.',
},
@@ -202,9 +202,15 @@ export class KafkaTrigger implements INodeType {
const kafka = new apacheKafka(config);
+ const maxInFlightRequests = (
+ this.getNodeParameter('options.maxInFlightRequests', null) === 0
+ ? null
+ : this.getNodeParameter('options.maxInFlightRequests', null)
+ ) as number;
+
const consumer = kafka.consumer({
groupId,
- maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number,
+ maxInFlightRequests,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});