fix(Kafka Node): Upgrade kafkajs and add tests (#14326)

Co-authored-by: Dana Lee <dana@n8n.io>
This commit is contained in:
Elias Meire
2025-04-02 17:12:42 +02:00
committed by GitHub
parent db381492a9
commit 5c58e8e8cf
7 changed files with 723 additions and 72 deletions

View File

@@ -0,0 +1,105 @@
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { mock } from 'jest-mock-extended';
import type { Producer } from 'kafkajs';
import { Kafka as apacheKafka } from 'kafkajs';
import path from 'path';
import { getWorkflowFilenames, testWorkflows } from '@test/nodes/Helpers';
jest.mock('kafkajs');
jest.mock('@kafkajs/confluent-schema-registry');
describe('Kafka Node', () => {
let mockProducer: jest.Mocked<Producer>;
let mockKafka: jest.Mocked<apacheKafka>;
let mockRegistry: jest.Mocked<SchemaRegistry>;
let mockProducerConnect: jest.Mock;
let mockProducerSend: jest.Mock;
let mockProducerDisconnect: jest.Mock;
let mockRegistryEncode: jest.Mock;
beforeAll(() => {
mockProducerConnect = jest.fn();
mockProducerSend = jest.fn().mockImplementation(async () => []);
mockProducerDisconnect = jest.fn();
mockProducer = mock<Producer>({
connect: mockProducerConnect,
send: mockProducerSend,
sendBatch: mockProducerSend,
disconnect: mockProducerDisconnect,
});
mockKafka = mock<apacheKafka>({
producer: jest.fn().mockReturnValue(mockProducer),
});
mockRegistryEncode = jest.fn((_id, input) => Buffer.from(JSON.stringify(input)));
mockRegistry = mock<SchemaRegistry>({
encode: mockRegistryEncode,
});
(apacheKafka as jest.Mock).mockReturnValue(mockKafka);
(SchemaRegistry as jest.Mock).mockReturnValue(mockRegistry);
});
const workflows = getWorkflowFilenames(path.join(__dirname, 'test'));
testWorkflows(workflows);
test('should publish the correct kafka messages', async () => {
expect(mockProducerSend).toHaveBeenCalledTimes(2);
expect(mockProducerSend).toHaveBeenCalledWith({
acks: 1,
compression: 1,
timeout: 1000,
topicMessages: [
{
messages: [
{
headers: { header: 'value' },
key: 'messageKey',
value: '{"name":"First item","code":1}',
},
],
topic: 'test-topic',
},
{
messages: [
{
headers: { header: 'value' },
key: 'messageKey',
value: '{"name":"Second item","code":2}',
},
],
topic: 'test-topic',
},
],
});
expect(mockProducerSend).toHaveBeenCalledWith({
acks: 0,
compression: 0,
topicMessages: [
{
messages: [
{
headers: { headerKey: 'headerValue' },
key: null,
value: Buffer.from(JSON.stringify({ foo: 'bar' })),
},
],
topic: 'test-topic',
},
{
messages: [
{
headers: { headerKey: 'headerValue' },
key: null,
value: Buffer.from(JSON.stringify({ foo: 'bar' })),
},
],
topic: 'test-topic',
},
],
});
});
});

View File

@@ -0,0 +1,377 @@
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { mock } from 'jest-mock-extended';
import {
Kafka,
logLevel,
type Consumer,
type ConsumerRunConfig,
type EachMessageHandler,
type IHeaders,
type KafkaMessage,
type RecordBatchEntry,
} from 'kafkajs';
import { NodeOperationError } from 'n8n-workflow';
import { testTriggerNode } from '@test/nodes/TriggerHelpers';
import { KafkaTrigger } from './KafkaTrigger.node';
jest.mock('kafkajs');
jest.mock('@kafkajs/confluent-schema-registry');
describe('KafkaTrigger Node', () => {
let mockKafka: jest.Mocked<Kafka>;
let mockRegistry: jest.Mocked<SchemaRegistry>;
let mockConsumerConnect: jest.Mock;
let mockConsumerSubscribe: jest.Mock;
let mockConsumerRun: jest.Mock;
let mockConsumerDisconnect: jest.Mock;
let mockConsumerCreate: jest.Mock;
let mockRegistryDecode: jest.Mock;
let publishMessage: (message: Partial<KafkaMessage>) => Promise<void>;
beforeEach(() => {
let mockEachMessage: jest.Mocked<EachMessageHandler> = jest.fn(async () => {});
mockConsumerConnect = jest.fn();
mockConsumerSubscribe = jest.fn();
mockConsumerRun = jest.fn(({ eachMessage }: ConsumerRunConfig) => {
if (eachMessage) {
mockEachMessage = eachMessage;
}
});
mockConsumerDisconnect = jest.fn();
mockConsumerCreate = jest.fn(() =>
mock<Consumer>({
connect: mockConsumerConnect,
subscribe: mockConsumerSubscribe,
run: mockConsumerRun,
disconnect: mockConsumerDisconnect,
}),
);
publishMessage = async (message: Partial<KafkaMessage>) => {
await mockEachMessage({
message: {
attributes: 1,
key: Buffer.from('messageKey'),
offset: '0',
timestamp: new Date().toISOString(),
value: Buffer.from('message'),
headers: {} as IHeaders,
...message,
} as RecordBatchEntry,
partition: 0,
topic: 'test-topic',
heartbeat: jest.fn(),
pause: jest.fn(),
});
};
mockKafka = mock<Kafka>({
consumer: mockConsumerCreate,
});
mockRegistryDecode = jest.fn().mockResolvedValue({ data: 'decoded-data' });
mockRegistry = mock<SchemaRegistry>({
decode: mockRegistryDecode,
});
(Kafka as jest.Mock).mockReturnValue(mockKafka);
(SchemaRegistry as jest.Mock).mockReturnValue(mockRegistry);
});
it('should connect to Kafka and subscribe to topic', async () => {
const { close, emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: false,
options: {
fromBeginning: true,
parallelProcessing: true,
},
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
expect(Kafka).toHaveBeenCalledWith({
clientId: 'n8n-kafka',
brokers: ['localhost:9092'],
ssl: false,
logLevel: logLevel.ERROR,
});
expect(mockConsumerCreate).toHaveBeenCalledWith({
groupId: 'test-group',
maxInFlightRequests: null,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
expect(mockConsumerConnect).toHaveBeenCalled();
expect(mockConsumerSubscribe).toHaveBeenCalledWith({
topic: 'test-topic',
fromBeginning: true,
});
expect(mockConsumerRun).toHaveBeenCalled();
await publishMessage({
value: Buffer.from('message'),
});
expect(emit).toHaveBeenCalledWith([[{ json: { message: 'message', topic: 'test-topic' } }]]);
await close();
expect(mockConsumerDisconnect).toHaveBeenCalled();
});
it('should handle authentication when credentials are provided', async () => {
await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: false,
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: true,
authentication: true,
username: 'test-user',
password: 'test-password',
saslMechanism: 'plain',
},
});
expect(Kafka).toHaveBeenCalledWith({
clientId: 'n8n-kafka',
brokers: ['localhost:9092'],
ssl: true,
logLevel: logLevel.ERROR,
sasl: {
username: 'test-user',
password: 'test-password',
mechanism: 'plain',
},
});
});
it('should throw an error if authentication is enabled but credentials are missing', async () => {
await expect(
testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: true,
},
}),
).rejects.toThrow(NodeOperationError);
});
it('should use schema registry when enabled', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: true,
schemaRegistryUrl: 'http://localhost:8081',
options: { parallelProcessing: true },
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
await publishMessage({
value: Buffer.from('test-message'),
headers: { 'content-type': Buffer.from('application/json') },
});
expect(SchemaRegistry).toHaveBeenCalledWith({
host: 'http://localhost:8081',
});
expect(mockRegistryDecode).toHaveBeenCalledWith(Buffer.from('test-message'));
expect(emit).toHaveBeenCalledWith([
[
{
json: {
message: { data: 'decoded-data' },
topic: 'test-topic',
},
},
],
]);
});
it('should parse JSON message when jsonParseMessage is true', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: false,
options: {
jsonParseMessage: true,
parallelProcessing: true,
onlyMessage: true,
},
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
const jsonData = { foo: 'bar' };
await publishMessage({
value: Buffer.from(JSON.stringify(jsonData)),
});
expect(emit).toHaveBeenCalledWith([[{ json: jsonData }]]);
});
it('should include headers when returnHeaders is true', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
typeVersion: 1,
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: false,
options: {
returnHeaders: true,
},
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
await publishMessage({
value: Buffer.from('test-message'),
headers: {
'content-type': Buffer.from('application/json'),
'correlation-id': '123456',
'with-array-value': ['1', '2', '3'],
empty: undefined,
},
});
expect(emit).toHaveBeenCalledWith([
[
{
json: {
message: 'test-message',
topic: 'test-topic',
headers: {
'content-type': 'application/json',
'correlation-id': '123456',
'with-array-value': '1,2,3',
empty: '',
},
},
},
],
]);
});
it('should handle manual trigger mode', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'manual',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: false,
options: {
parallelProcessing: true,
},
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
expect(mockConsumerConnect).toHaveBeenCalledTimes(1);
expect(mockConsumerSubscribe).toHaveBeenCalledTimes(1);
expect(mockConsumerRun).toHaveBeenCalledTimes(1);
expect(emit).not.toHaveBeenCalled();
await publishMessage({ value: Buffer.from('test') });
expect(emit).toHaveBeenCalledWith([[{ json: { message: 'test', topic: 'test-topic' } }]]);
});
it('should handle sequential processing when parallelProcessing is false', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: false,
options: {
parallelProcessing: false,
},
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
const publishPromise = publishMessage({
value: Buffer.from('test-message'),
});
expect(emit).toHaveBeenCalled();
const deferredPromise = emit.mock.calls[0][2];
expect(deferredPromise).toBeDefined();
deferredPromise?.resolve(mock());
await publishPromise;
});
});

View File

@@ -183,7 +183,7 @@ export class KafkaTrigger implements INodeType {
const credentials = await this.getCredentials('kafka');
const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim());
const brokers = ((credentials.brokers as string) ?? '').split(',').map((item) => item.trim());
const clientId = credentials.clientId as string;
@@ -214,14 +214,19 @@ export class KafkaTrigger implements INodeType {
} as SASLOptions;
}
const kafka = new apacheKafka(config);
const maxInFlightRequests = (
this.getNodeParameter('options.maxInFlightRequests', null) === 0
? null
: this.getNodeParameter('options.maxInFlightRequests', null)
) as number;
const parallelProcessing = options.parallelProcessing as boolean;
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const kafka = new apacheKafka(config);
const consumer = kafka.consumer({
groupId,
maxInFlightRequests,
@@ -229,17 +234,16 @@ export class KafkaTrigger implements INodeType {
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});
const parallelProcessing = options.parallelProcessing as boolean;
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false });
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await consumer.disconnect();
}
const startConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false });
await consumer.run({
autoCommitInterval: (options.autoCommitInterval as number) || null,
autoCommitThreshold: (options.autoCommitThreshold as number) || null,
@@ -261,13 +265,12 @@ export class KafkaTrigger implements INodeType {
}
if (options.returnHeaders && message.headers) {
const headers: { [key: string]: string } = {};
for (const key of Object.keys(message.headers)) {
const header = message.headers[key];
headers[key] = header?.toString('utf8') || '';
}
data.headers = headers;
data.headers = Object.fromEntries(
Object.entries(message.headers).map(([headerKey, headerValue]) => [
headerKey,
headerValue?.toString('utf8') ?? '',
]),
);
}
data.message = value;
@@ -291,27 +294,24 @@ export class KafkaTrigger implements INodeType {
});
};
await startConsumer();
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await consumer.disconnect();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
async function manualTriggerFunction() {
if (this.getMode() !== 'manual') {
await startConsumer();
}
return { closeFunction };
} else {
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
async function manualTriggerFunction() {
await startConsumer();
}
return {
closeFunction,
manualTriggerFunction,
};
return {
closeFunction,
manualTriggerFunction,
};
}
}
}

View File

@@ -0,0 +1,134 @@
{
"name": "Kafka test",
"nodes": [
{
"parameters": {},
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [0, -100],
"id": "d0594d58-ebb3-4dc0-a241-3f2531212fd7",
"name": "When clicking Test workflow"
},
{
"parameters": {
"topic": "test-topic",
"useKey": true,
"key": "messageKey",
"headersUi": {
"headerValues": [
{
"key": "header",
"value": "value"
}
]
},
"options": {
"acks": true,
"compression": true,
"timeout": 1000
}
},
"type": "n8n-nodes-base.kafka",
"typeVersion": 1,
"position": [440, -200],
"id": "f29d6af7-9ded-421a-8ada-cea80eac9464",
"name": "Send Input Data",
"credentials": {
"kafka": {
"id": "JJBjHkOrIfcj91EX",
"name": "Kafka account"
}
}
},
{
"parameters": {
"topic": "test-topic",
"sendInputData": false,
"message": "={{ JSON.stringify({foo: 'bar'}) }}",
"jsonParameters": true,
"useSchemaRegistry": true,
"schemaRegistryUrl": "https://test-kafka-registry.local",
"eventName": "test-event-name",
"headerParametersJson": "{\n \"headerKey\": \"headerValue\"\n}",
"options": {}
},
"type": "n8n-nodes-base.kafka",
"typeVersion": 1,
"position": [440, 0],
"id": "d851834f-6b97-445d-8e69-cc2e873bdf80",
"name": "Schema Registry",
"credentials": {
"kafka": {
"id": "JJBjHkOrIfcj91EX",
"name": "Kafka account"
}
}
},
{
"parameters": {
"jsCode": "return [\n {\n \"name\": \"First item\",\n \"code\": 1\n },\n {\n \"name\": \"Second item\",\n \"code\": 2\n }\n]"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [220, -100],
"id": "50ce815c-cf9a-4d83-8739-c95f9c3d7ec6",
"name": "Test Data"
}
],
"pinData": {
"Send Input Data": [
{
"json": {
"success": true
}
}
],
"Schema Registry": [
{
"json": {
"success": true
}
}
]
},
"connections": {
"When clicking Test workflow": {
"main": [
[
{
"node": "Test Data",
"type": "main",
"index": 0
}
]
]
},
"Test Data": {
"main": [
[
{
"node": "Schema Registry",
"type": "main",
"index": 0
},
{
"node": "Send Input Data",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "be4cbb16-225f-41ed-b897-895aaa34ea34",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "27cc9b56542ad45b38725555722c50a1c3fee1670bbb67980558314ee08517c4"
},
"id": "r7XhZVcfhaGvCbgE",
"tags": []
}