mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 10:02:05 +00:00
🔀 Merge branch 'Add-schema-registry-into-kafka' of https://github.com/rgeorgel/n8n into rgeorgel-Add-schema-registry-into-kafka
This commit is contained in:
@@ -5,6 +5,8 @@ import {
|
||||
SASLOptions,
|
||||
} from 'kafkajs';
|
||||
|
||||
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
|
||||
|
||||
import {
|
||||
ITriggerFunctions,
|
||||
} from 'n8n-core';
|
||||
@@ -55,6 +57,29 @@ export class KafkaTrigger implements INodeType {
|
||||
placeholder: 'n8n-kafka',
|
||||
description: 'ID of the consumer group.',
|
||||
},
|
||||
{
|
||||
displayName: 'Use Schema Registry',
|
||||
name: 'useSchemaRegistry',
|
||||
type: 'boolean',
|
||||
default: false,
|
||||
description: 'Use Confluent Schema Registry.',
|
||||
},
|
||||
{
|
||||
displayName: 'Schema Registry URL',
|
||||
name: 'schemaRegistryUrl',
|
||||
type: 'string',
|
||||
required: true,
|
||||
displayOptions: {
|
||||
show: {
|
||||
useSchemaRegistry: [
|
||||
true,
|
||||
],
|
||||
},
|
||||
},
|
||||
placeholder: 'https://schema-registry-domain:8081',
|
||||
default: '',
|
||||
description: 'URL of the schema registry.',
|
||||
},
|
||||
{
|
||||
displayName: 'Options',
|
||||
name: 'options',
|
||||
@@ -104,6 +129,13 @@ export class KafkaTrigger implements INodeType {
|
||||
default: 30000,
|
||||
description: 'The time to await a response in ms.',
|
||||
},
|
||||
{
|
||||
displayName: 'Return headers',
|
||||
name: 'returnHeaders',
|
||||
type: 'boolean',
|
||||
default: false,
|
||||
description: 'Return the headers received from Kafka',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
@@ -153,6 +185,10 @@ export class KafkaTrigger implements INodeType {
|
||||
|
||||
const self = this;
|
||||
|
||||
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
|
||||
|
||||
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
|
||||
|
||||
const startConsumer = async () => {
|
||||
await consumer.run({
|
||||
eachMessage: async ({ topic, message }) => {
|
||||
@@ -166,6 +202,23 @@ export class KafkaTrigger implements INodeType {
|
||||
} catch (error) { }
|
||||
}
|
||||
|
||||
if (useSchemaRegistry) {
|
||||
try {
|
||||
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
|
||||
value = await registry.decode(message.value as Buffer);
|
||||
} catch (error) { }
|
||||
}
|
||||
|
||||
if (options.returnHeaders) {
|
||||
const headers: {[key: string]: string} = {};
|
||||
for (const key in message.headers) {
|
||||
const header = message.headers[key];
|
||||
headers[key] = header?.toString('utf8') || '';
|
||||
}
|
||||
|
||||
data.headers = headers;
|
||||
}
|
||||
|
||||
data.message = value;
|
||||
data.topic = topic;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user