diff --git a/packages/nodes-base/credentials/Kafka.credentials.ts b/packages/nodes-base/credentials/Kafka.credentials.ts index 2366e89f92..1774606977 100644 --- a/packages/nodes-base/credentials/Kafka.credentials.ts +++ b/packages/nodes-base/credentials/Kafka.credentials.ts @@ -13,12 +13,14 @@ export class Kafka implements ICredentialType { name: 'clientId', type: 'string' as NodePropertyTypes, default: '', + placeholder: 'my-app', }, { displayName: 'Brokers', name: 'brokers', type: 'string' as NodePropertyTypes, default: '', + placeholder: 'kafka1:9092,kafka2:9092', }, { displayName: 'SSL', @@ -26,5 +28,22 @@ export class Kafka implements ICredentialType { type: 'boolean' as NodePropertyTypes, default: true, }, + { + displayName: 'Username', + name: 'username', + type: 'string' as NodePropertyTypes, + default: '', + description: 'Optional username if authenticated is required.', + }, + { + displayName: 'Password', + name: 'password', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + default: '', + description: 'Optional password if authenticated is required.', + }, ]; } diff --git a/packages/nodes-base/credentials/KafkaPlain.credentials.ts b/packages/nodes-base/credentials/KafkaPlain.credentials.ts deleted file mode 100644 index 7de6cbbf46..0000000000 --- a/packages/nodes-base/credentials/KafkaPlain.credentials.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { - ICredentialType, - NodePropertyTypes, -} from 'n8n-workflow'; - -export class KafkaPlain implements ICredentialType { - extends = [ - 'kafka', - ]; - name = 'kafkaPlain'; - displayName = 'Kafka'; - properties = [ - { - displayName: 'Username', - name: 'username', - type: 'string' as NodePropertyTypes, - default: '', - }, - { - displayName: 'Password', - name: 'password', - type: 'string' as NodePropertyTypes, - typeOptions: { - password: true, - }, - default: '', - }, - ]; -} diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 851560a22a..61f4268e24 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -34,58 +34,38 @@ export class Kafka implements INodeType { credentials: [ { name: 'kafka', - displayOptions: { - show: { - authentication: [ - 'none', - ], - }, - }, - required: true, - }, - { - name: 'kafkaPlain', - displayOptions: { - show: { - authentication: [ - 'plain', - ], - }, - }, required: true, }, ], properties: [ - { - displayName: 'Authentication', - name: 'authentication', - type: 'options', - options: [ - { - name: 'None', - value: 'none', - }, - { - name: 'Plain', - value: 'plain', - }, - ], - default: 'none', - }, { displayName: 'Topic', name: 'topic', type: 'string', default: '', placeholder: 'topic-name', - description: 'Name of the queue of topic to publish to', + description: 'Name of the queue of topic to publish to.', + }, + { + displayName: 'Send Input Data', + name: 'sendInputData', + type: 'boolean', + default: true, + description: 'Send the the data the node receives as JSON to Kafka.', }, { displayName: 'Message', name: 'message', type: 'string', + displayOptions: { + show: { + sendInputData: [ + false, + ], + }, + }, default: '', - description: 'The message to be sent', + description: 'The message to be sent.', }, { displayName: 'JSON Parameters', @@ -142,7 +122,7 @@ export class Kafka implements INodeType { }, }, default: '', - description: 'Header parameters as JSON (flat object)', + description: 'Header parameters as JSON (flat object).', }, { displayName: 'Options', @@ -156,21 +136,21 @@ export class Kafka implements INodeType { name: 'acks', type: 'boolean', default: false, - description: 'Whether or not producer must wait for acknowledgement from all replicas', + description: 'Whether or not producer must wait for acknowledgement from all replicas.', }, { displayName: 'Compression', name: 'compression', type: 'boolean', default: false, - description: 'Send the data in a compressed format using the GZIP codec', + description: 'Send the data in a compressed format using the GZIP codec.', }, { displayName: 'Timeout', name: 'timeout', type: 'number', default: 30000, - description: 'The time to await a response in ms', + description: 'The time to await a response in ms.', }, ], }, @@ -182,13 +162,12 @@ export class Kafka implements INodeType { const length = items.length as unknown as number; - const authentication = this.getNodeParameter('authentication', 0) as string; - const topicMessages: TopicMessages[] = []; - let responseData; + let responseData: IDataObject[]; const options = this.getNodeParameter('options', 0) as IDataObject; + const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; const timeout = options.timeout as number; @@ -200,11 +179,9 @@ export class Kafka implements INodeType { compression = CompressionTypes.GZIP; } - let credentials: IDataObject = {}; + const credentials = this.getCredentials('kafka') as IDataObject; - const sasl: SASLOptions | IDataObject = {}; - - const brokers = (credentials.brokers as string || '').split(',') as string[]; + const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[]; const clientId = credentials.clientId as string; @@ -214,18 +191,13 @@ export class Kafka implements INodeType { clientId, brokers, ssl, - //@ts-ignore - sasl, }; - if (authentication === 'plain') { - credentials = this.getCredentials('kafkaPlain') as IDataObject; - sasl.username = credentials.username as string; - sasl.password = credentials.password as string; - sasl.mechanism = 'plain'; - } else { - credentials = this.getCredentials('kafka') as IDataObject; - delete config.sasl; + if (credentials.username || credentials.password) { + config.sasl = { + username: credentials.username as string, + password: credentials.password as string, + } as SASLOptions; } const kafka = new apacheKafka(config); @@ -234,9 +206,14 @@ export class Kafka implements INodeType { await producer.connect(); - for (let i = 0; i < length; i++) { + let message: string; - const message = this.getNodeParameter('message', i) as string; + for (let i = 0; i < length; i++) { + if (sendInputData === true) { + message = JSON.stringify(items[i].json); + } else { + message = this.getNodeParameter('message', i) as string; + } const topic = this.getNodeParameter('topic', i) as string; @@ -278,7 +255,14 @@ export class Kafka implements INodeType { timeout, compression, acks, + }, + ); + + if (responseData.length === 0) { + responseData.push({ + success: true, }); + } await producer.disconnect(); diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index e1aa40c109..070adb8a05 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -102,7 +102,6 @@ "dist/credentials/JiraSoftwareServerApi.credentials.js", "dist/credentials/JotFormApi.credentials.js", "dist/credentials/Kafka.credentials.js", - "dist/credentials/KafkaPlain.credentials.js", "dist/credentials/KeapOAuth2Api.credentials.js", "dist/credentials/LinkedInOAuth2Api.credentials.js", "dist/credentials/MailerLiteApi.credentials.js",