diff --git a/packages/nodes-base/credentials/Kafka.credentials.ts b/packages/nodes-base/credentials/Kafka.credentials.ts new file mode 100644 index 0000000000..2366e89f92 --- /dev/null +++ b/packages/nodes-base/credentials/Kafka.credentials.ts @@ -0,0 +1,30 @@ +import { + ICredentialType, + NodePropertyTypes, +} from 'n8n-workflow'; + +export class Kafka implements ICredentialType { + name = 'kafka'; + displayName = 'Kafka'; + documentationUrl = 'kafka'; + properties = [ + { + displayName: 'Client ID', + name: 'clientId', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'Brokers', + name: 'brokers', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'SSL', + name: 'ssl', + type: 'boolean' as NodePropertyTypes, + default: true, + }, + ]; +} diff --git a/packages/nodes-base/credentials/KafkaPlain.credentials.ts b/packages/nodes-base/credentials/KafkaPlain.credentials.ts new file mode 100644 index 0000000000..7de6cbbf46 --- /dev/null +++ b/packages/nodes-base/credentials/KafkaPlain.credentials.ts @@ -0,0 +1,29 @@ +import { + ICredentialType, + NodePropertyTypes, +} from 'n8n-workflow'; + +export class KafkaPlain implements ICredentialType { + extends = [ + 'kafka', + ]; + name = 'kafkaPlain'; + displayName = 'Kafka'; + properties = [ + { + displayName: 'Username', + name: 'username', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'Password', + name: 'password', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + default: '', + }, + ]; +} diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts new file mode 100644 index 0000000000..851560a22a --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -0,0 +1,287 @@ +import { + CompressionTypes, + Kafka as apacheKafka, + KafkaConfig, + SASLOptions, + TopicMessages, +} from 'kafkajs'; + +import { + IExecuteFunctions, +} from 'n8n-core'; + +import { + IDataObject, + INodeExecutionData, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +export class Kafka implements INodeType { + description: INodeTypeDescription = { + displayName: 'Kafka', + name: 'kafka', + icon: 'file:kafka.svg', + group: ['transform'], + version: 1, + description: 'Sends messages to a Kafka topic', + defaults: { + name: 'Kafka', + color: '#000000', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'kafka', + displayOptions: { + show: { + authentication: [ + 'none', + ], + }, + }, + required: true, + }, + { + name: 'kafkaPlain', + displayOptions: { + show: { + authentication: [ + 'plain', + ], + }, + }, + required: true, + }, + ], + properties: [ + { + displayName: 'Authentication', + name: 'authentication', + type: 'options', + options: [ + { + name: 'None', + value: 'none', + }, + { + name: 'Plain', + value: 'plain', + }, + ], + default: 'none', + }, + { + displayName: 'Topic', + name: 'topic', + type: 'string', + default: '', + placeholder: 'topic-name', + description: 'Name of the queue of topic to publish to', + }, + { + displayName: 'Message', + name: 'message', + type: 'string', + default: '', + description: 'The message to be sent', + }, + { + displayName: 'JSON Parameters', + name: 'jsonParameters', + type: 'boolean', + default: false, + }, + { + displayName: 'Headers', + name: 'headersUi', + placeholder: 'Add Header', + type: 'fixedCollection', + displayOptions: { + show: { + jsonParameters: [ + false, + ], + }, + }, + typeOptions: { + multipleValues: true, + }, + default: {}, + options: [ + { + name: 'headerValues', + displayName: 'Header', + values: [ + { + displayName: 'Key', + name: 'key', + type: 'string', + default: '', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], + }, + { + displayName: 'Headers (JSON)', + name: 'headerParametersJson', + type: 'json', + displayOptions: { + show: { + jsonParameters: [ + true, + ], + }, + }, + default: '', + description: 'Header parameters as JSON (flat object)', + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + default: {}, + placeholder: 'Add Option', + options: [ + { + displayName: 'Acks', + name: 'acks', + type: 'boolean', + default: false, + description: 'Whether or not producer must wait for acknowledgement from all replicas', + }, + { + displayName: 'Compression', + name: 'compression', + type: 'boolean', + default: false, + description: 'Send the data in a compressed format using the GZIP codec', + }, + { + displayName: 'Timeout', + name: 'timeout', + type: 'number', + default: 30000, + description: 'The time to await a response in ms', + }, + ], + }, + ], + }; + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + + const length = items.length as unknown as number; + + const authentication = this.getNodeParameter('authentication', 0) as string; + + const topicMessages: TopicMessages[] = []; + + let responseData; + + const options = this.getNodeParameter('options', 0) as IDataObject; + + const timeout = options.timeout as number; + + let compression = CompressionTypes.None; + + const acks = (options.acks === true) ? 1 : 0; + + if (options.compression === true) { + compression = CompressionTypes.GZIP; + } + + let credentials: IDataObject = {}; + + const sasl: SASLOptions | IDataObject = {}; + + const brokers = (credentials.brokers as string || '').split(',') as string[]; + + const clientId = credentials.clientId as string; + + const ssl = credentials.ssl as boolean; + + const config: KafkaConfig = { + clientId, + brokers, + ssl, + //@ts-ignore + sasl, + }; + + if (authentication === 'plain') { + credentials = this.getCredentials('kafkaPlain') as IDataObject; + sasl.username = credentials.username as string; + sasl.password = credentials.password as string; + sasl.mechanism = 'plain'; + } else { + credentials = this.getCredentials('kafka') as IDataObject; + delete config.sasl; + } + + const kafka = new apacheKafka(config); + + const producer = kafka.producer(); + + await producer.connect(); + + for (let i = 0; i < length; i++) { + + const message = this.getNodeParameter('message', i) as string; + + const topic = this.getNodeParameter('topic', i) as string; + + const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean; + + let headers; + + if (jsonParameters === true) { + headers = this.getNodeParameter('headerParametersJson', i) as string; + try { + headers = JSON.parse(headers); + } catch (exception) { + throw new Error('Headers must be a valid json'); + } + } else { + const values = (this.getNodeParameter('headersUi', i) as IDataObject).headerValues as IDataObject[]; + headers = {}; + if (values !== undefined) { + for (const value of values) { + //@ts-ignore + headers[value.key] = value.value; + } + } + } + + topicMessages.push( + { + topic, + messages: [{ + value: message, + headers, + }], + }); + } + + responseData = await producer.sendBatch( + { + topicMessages, + timeout, + compression, + acks, + }); + + await producer.disconnect(); + + return [this.helpers.returnJsonArray(responseData)]; + } +} diff --git a/packages/nodes-base/nodes/Kafka/kafka.svg b/packages/nodes-base/nodes/Kafka/kafka.svg new file mode 100644 index 0000000000..91f4712005 --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/kafka.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index b9876c75a9..e1aa40c109 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -101,6 +101,8 @@ "dist/credentials/JiraSoftwareCloudApi.credentials.js", "dist/credentials/JiraSoftwareServerApi.credentials.js", "dist/credentials/JotFormApi.credentials.js", + "dist/credentials/Kafka.credentials.js", + "dist/credentials/KafkaPlain.credentials.js", "dist/credentials/KeapOAuth2Api.credentials.js", "dist/credentials/LinkedInOAuth2Api.credentials.js", "dist/credentials/MailerLiteApi.credentials.js", @@ -299,6 +301,7 @@ "dist/nodes/Jira/Jira.node.js", "dist/nodes/Jira/JiraTrigger.node.js", "dist/nodes/JotForm/JotFormTrigger.node.js", + "dist/nodes/Kafka/Kafka.node.js", "dist/nodes/Keap/Keap.node.js", "dist/nodes/Keap/KeapTrigger.node.js", "dist/nodes/LinkedIn/LinkedIn.node.js", @@ -454,6 +457,7 @@ "imap-simple": "^4.3.0", "iso-639-1": "^2.1.3", "jsonwebtoken": "^8.5.1", + "kafkajs": "^1.14.0", "lodash.get": "^4.4.2", "lodash.set": "^4.3.2", "lodash.unset": "^4.5.2",