From 5c58e8e8cfe5d04917b29d3d7ab9c14c27d95fdf Mon Sep 17 00:00:00 2001 From: Elias Meire Date: Wed, 2 Apr 2025 17:12:42 +0200 Subject: [PATCH] fix(Kafka Node): Upgrade kafkajs and add tests (#14326) Co-authored-by: Dana Lee --- .../nodes-base/nodes/Kafka/Kafka.node.test.ts | 105 +++++ .../nodes/Kafka/KafkaTrigger.node.test.ts | 377 ++++++++++++++++++ .../nodes/Kafka/KafkaTrigger.node.ts | 78 ++-- .../nodes-base/nodes/Kafka/test/workflow.json | 134 +++++++ packages/nodes-base/package.json | 4 +- .../nodes-base/test/nodes/TriggerHelpers.ts | 36 +- pnpm-lock.yaml | 61 ++- 7 files changed, 723 insertions(+), 72 deletions(-) create mode 100644 packages/nodes-base/nodes/Kafka/Kafka.node.test.ts create mode 100644 packages/nodes-base/nodes/Kafka/KafkaTrigger.node.test.ts create mode 100644 packages/nodes-base/nodes/Kafka/test/workflow.json diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.test.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.test.ts new file mode 100644 index 0000000000..27d20d750a --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.test.ts @@ -0,0 +1,105 @@ +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; +import { mock } from 'jest-mock-extended'; +import type { Producer } from 'kafkajs'; +import { Kafka as apacheKafka } from 'kafkajs'; +import path from 'path'; + +import { getWorkflowFilenames, testWorkflows } from '@test/nodes/Helpers'; + +jest.mock('kafkajs'); +jest.mock('@kafkajs/confluent-schema-registry'); + +describe('Kafka Node', () => { + let mockProducer: jest.Mocked; + let mockKafka: jest.Mocked; + let mockRegistry: jest.Mocked; + let mockProducerConnect: jest.Mock; + let mockProducerSend: jest.Mock; + let mockProducerDisconnect: jest.Mock; + let mockRegistryEncode: jest.Mock; + + beforeAll(() => { + mockProducerConnect = jest.fn(); + mockProducerSend = jest.fn().mockImplementation(async () => []); + mockProducerDisconnect = jest.fn(); + + mockProducer = mock({ + connect: mockProducerConnect, + send: mockProducerSend, + sendBatch: mockProducerSend, + disconnect: mockProducerDisconnect, + }); + + mockKafka = mock({ + producer: jest.fn().mockReturnValue(mockProducer), + }); + + mockRegistryEncode = jest.fn((_id, input) => Buffer.from(JSON.stringify(input))); + mockRegistry = mock({ + encode: mockRegistryEncode, + }); + + (apacheKafka as jest.Mock).mockReturnValue(mockKafka); + (SchemaRegistry as jest.Mock).mockReturnValue(mockRegistry); + }); + + const workflows = getWorkflowFilenames(path.join(__dirname, 'test')); + testWorkflows(workflows); + + test('should publish the correct kafka messages', async () => { + expect(mockProducerSend).toHaveBeenCalledTimes(2); + expect(mockProducerSend).toHaveBeenCalledWith({ + acks: 1, + compression: 1, + timeout: 1000, + topicMessages: [ + { + messages: [ + { + headers: { header: 'value' }, + key: 'messageKey', + value: '{"name":"First item","code":1}', + }, + ], + topic: 'test-topic', + }, + { + messages: [ + { + headers: { header: 'value' }, + key: 'messageKey', + value: '{"name":"Second item","code":2}', + }, + ], + topic: 'test-topic', + }, + ], + }); + expect(mockProducerSend).toHaveBeenCalledWith({ + acks: 0, + compression: 0, + topicMessages: [ + { + messages: [ + { + headers: { headerKey: 'headerValue' }, + key: null, + value: Buffer.from(JSON.stringify({ foo: 'bar' })), + }, + ], + topic: 'test-topic', + }, + { + messages: [ + { + headers: { headerKey: 'headerValue' }, + key: null, + value: Buffer.from(JSON.stringify({ foo: 'bar' })), + }, + ], + topic: 'test-topic', + }, + ], + }); + }); +}); diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.test.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.test.ts new file mode 100644 index 0000000000..c543b0fb5b --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.test.ts @@ -0,0 +1,377 @@ +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; +import { mock } from 'jest-mock-extended'; +import { + Kafka, + logLevel, + type Consumer, + type ConsumerRunConfig, + type EachMessageHandler, + type IHeaders, + type KafkaMessage, + type RecordBatchEntry, +} from 'kafkajs'; +import { NodeOperationError } from 'n8n-workflow'; + +import { testTriggerNode } from '@test/nodes/TriggerHelpers'; + +import { KafkaTrigger } from './KafkaTrigger.node'; + +jest.mock('kafkajs'); +jest.mock('@kafkajs/confluent-schema-registry'); + +describe('KafkaTrigger Node', () => { + let mockKafka: jest.Mocked; + let mockRegistry: jest.Mocked; + let mockConsumerConnect: jest.Mock; + let mockConsumerSubscribe: jest.Mock; + let mockConsumerRun: jest.Mock; + let mockConsumerDisconnect: jest.Mock; + let mockConsumerCreate: jest.Mock; + let mockRegistryDecode: jest.Mock; + let publishMessage: (message: Partial) => Promise; + + beforeEach(() => { + let mockEachMessage: jest.Mocked = jest.fn(async () => {}); + mockConsumerConnect = jest.fn(); + mockConsumerSubscribe = jest.fn(); + mockConsumerRun = jest.fn(({ eachMessage }: ConsumerRunConfig) => { + if (eachMessage) { + mockEachMessage = eachMessage; + } + }); + mockConsumerDisconnect = jest.fn(); + mockConsumerCreate = jest.fn(() => + mock({ + connect: mockConsumerConnect, + subscribe: mockConsumerSubscribe, + run: mockConsumerRun, + disconnect: mockConsumerDisconnect, + }), + ); + + publishMessage = async (message: Partial) => { + await mockEachMessage({ + message: { + attributes: 1, + key: Buffer.from('messageKey'), + offset: '0', + timestamp: new Date().toISOString(), + value: Buffer.from('message'), + headers: {} as IHeaders, + ...message, + } as RecordBatchEntry, + partition: 0, + topic: 'test-topic', + heartbeat: jest.fn(), + pause: jest.fn(), + }); + }; + + mockKafka = mock({ + consumer: mockConsumerCreate, + }); + + mockRegistryDecode = jest.fn().mockResolvedValue({ data: 'decoded-data' }); + mockRegistry = mock({ + decode: mockRegistryDecode, + }); + + (Kafka as jest.Mock).mockReturnValue(mockKafka); + (SchemaRegistry as jest.Mock).mockReturnValue(mockRegistry); + }); + + it('should connect to Kafka and subscribe to topic', async () => { + const { close, emit } = await testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: false, + options: { + fromBeginning: true, + parallelProcessing: true, + }, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: false, + }, + }); + + expect(Kafka).toHaveBeenCalledWith({ + clientId: 'n8n-kafka', + brokers: ['localhost:9092'], + ssl: false, + logLevel: logLevel.ERROR, + }); + + expect(mockConsumerCreate).toHaveBeenCalledWith({ + groupId: 'test-group', + maxInFlightRequests: null, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + expect(mockConsumerConnect).toHaveBeenCalled(); + expect(mockConsumerSubscribe).toHaveBeenCalledWith({ + topic: 'test-topic', + fromBeginning: true, + }); + expect(mockConsumerRun).toHaveBeenCalled(); + + await publishMessage({ + value: Buffer.from('message'), + }); + expect(emit).toHaveBeenCalledWith([[{ json: { message: 'message', topic: 'test-topic' } }]]); + + await close(); + expect(mockConsumerDisconnect).toHaveBeenCalled(); + }); + + it('should handle authentication when credentials are provided', async () => { + await testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: false, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: true, + authentication: true, + username: 'test-user', + password: 'test-password', + saslMechanism: 'plain', + }, + }); + + expect(Kafka).toHaveBeenCalledWith({ + clientId: 'n8n-kafka', + brokers: ['localhost:9092'], + ssl: true, + logLevel: logLevel.ERROR, + sasl: { + username: 'test-user', + password: 'test-password', + mechanism: 'plain', + }, + }); + }); + + it('should throw an error if authentication is enabled but credentials are missing', async () => { + await expect( + testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: true, + }, + }), + ).rejects.toThrow(NodeOperationError); + }); + + it('should use schema registry when enabled', async () => { + const { emit } = await testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: true, + schemaRegistryUrl: 'http://localhost:8081', + options: { parallelProcessing: true }, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: false, + }, + }); + + await publishMessage({ + value: Buffer.from('test-message'), + headers: { 'content-type': Buffer.from('application/json') }, + }); + + expect(SchemaRegistry).toHaveBeenCalledWith({ + host: 'http://localhost:8081', + }); + expect(mockRegistryDecode).toHaveBeenCalledWith(Buffer.from('test-message')); + expect(emit).toHaveBeenCalledWith([ + [ + { + json: { + message: { data: 'decoded-data' }, + topic: 'test-topic', + }, + }, + ], + ]); + }); + + it('should parse JSON message when jsonParseMessage is true', async () => { + const { emit } = await testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: false, + options: { + jsonParseMessage: true, + parallelProcessing: true, + onlyMessage: true, + }, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: false, + }, + }); + + const jsonData = { foo: 'bar' }; + + await publishMessage({ + value: Buffer.from(JSON.stringify(jsonData)), + }); + + expect(emit).toHaveBeenCalledWith([[{ json: jsonData }]]); + }); + + it('should include headers when returnHeaders is true', async () => { + const { emit } = await testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + typeVersion: 1, + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: false, + options: { + returnHeaders: true, + }, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: false, + }, + }); + + await publishMessage({ + value: Buffer.from('test-message'), + headers: { + 'content-type': Buffer.from('application/json'), + 'correlation-id': '123456', + 'with-array-value': ['1', '2', '3'], + empty: undefined, + }, + }); + + expect(emit).toHaveBeenCalledWith([ + [ + { + json: { + message: 'test-message', + topic: 'test-topic', + headers: { + 'content-type': 'application/json', + 'correlation-id': '123456', + 'with-array-value': '1,2,3', + empty: '', + }, + }, + }, + ], + ]); + }); + + it('should handle manual trigger mode', async () => { + const { emit } = await testTriggerNode(KafkaTrigger, { + mode: 'manual', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: false, + options: { + parallelProcessing: true, + }, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: false, + }, + }); + + expect(mockConsumerConnect).toHaveBeenCalledTimes(1); + expect(mockConsumerSubscribe).toHaveBeenCalledTimes(1); + expect(mockConsumerRun).toHaveBeenCalledTimes(1); + + expect(emit).not.toHaveBeenCalled(); + + await publishMessage({ value: Buffer.from('test') }); + + expect(emit).toHaveBeenCalledWith([[{ json: { message: 'test', topic: 'test-topic' } }]]); + }); + + it('should handle sequential processing when parallelProcessing is false', async () => { + const { emit } = await testTriggerNode(KafkaTrigger, { + mode: 'trigger', + node: { + parameters: { + topic: 'test-topic', + groupId: 'test-group', + useSchemaRegistry: false, + options: { + parallelProcessing: false, + }, + }, + }, + credential: { + brokers: 'localhost:9092', + clientId: 'n8n-kafka', + ssl: false, + authentication: false, + }, + }); + + const publishPromise = publishMessage({ + value: Buffer.from('test-message'), + }); + + expect(emit).toHaveBeenCalled(); + + const deferredPromise = emit.mock.calls[0][2]; + expect(deferredPromise).toBeDefined(); + + deferredPromise?.resolve(mock()); + await publishPromise; + }); +}); diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 78b4446917..31c122b2f6 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -183,7 +183,7 @@ export class KafkaTrigger implements INodeType { const credentials = await this.getCredentials('kafka'); - const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim()); + const brokers = ((credentials.brokers as string) ?? '').split(',').map((item) => item.trim()); const clientId = credentials.clientId as string; @@ -214,14 +214,19 @@ export class KafkaTrigger implements INodeType { } as SASLOptions; } - const kafka = new apacheKafka(config); - const maxInFlightRequests = ( this.getNodeParameter('options.maxInFlightRequests', null) === 0 ? null : this.getNodeParameter('options.maxInFlightRequests', null) ) as number; + const parallelProcessing = options.parallelProcessing as boolean; + + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + + const kafka = new apacheKafka(config); const consumer = kafka.consumer({ groupId, maxInFlightRequests, @@ -229,17 +234,16 @@ export class KafkaTrigger implements INodeType { heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, }); - const parallelProcessing = options.parallelProcessing as boolean; - - await consumer.connect(); - - await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false }); - - const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; - - const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + // The "closeFunction" function gets called by n8n whenever + // the workflow gets deactivated and can so clean up. + async function closeFunction() { + await consumer.disconnect(); + } const startConsumer = async () => { + await consumer.connect(); + + await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false }); await consumer.run({ autoCommitInterval: (options.autoCommitInterval as number) || null, autoCommitThreshold: (options.autoCommitThreshold as number) || null, @@ -261,13 +265,12 @@ export class KafkaTrigger implements INodeType { } if (options.returnHeaders && message.headers) { - const headers: { [key: string]: string } = {}; - for (const key of Object.keys(message.headers)) { - const header = message.headers[key]; - headers[key] = header?.toString('utf8') || ''; - } - - data.headers = headers; + data.headers = Object.fromEntries( + Object.entries(message.headers).map(([headerKey, headerValue]) => [ + headerKey, + headerValue?.toString('utf8') ?? '', + ]), + ); } data.message = value; @@ -291,27 +294,24 @@ export class KafkaTrigger implements INodeType { }); }; - await startConsumer(); - - // The "closeFunction" function gets called by n8n whenever - // the workflow gets deactivated and can so clean up. - async function closeFunction() { - await consumer.disconnect(); - } - - // The "manualTriggerFunction" function gets called by n8n - // when a user is in the workflow editor and starts the - // workflow manually. So the function has to make sure that - // the emit() gets called with similar data like when it - // would trigger by itself so that the user knows what data - // to expect. - async function manualTriggerFunction() { + if (this.getMode() !== 'manual') { await startConsumer(); - } + return { closeFunction }; + } else { + // The "manualTriggerFunction" function gets called by n8n + // when a user is in the workflow editor and starts the + // workflow manually. So the function has to make sure that + // the emit() gets called with similar data like when it + // would trigger by itself so that the user knows what data + // to expect. + async function manualTriggerFunction() { + await startConsumer(); + } - return { - closeFunction, - manualTriggerFunction, - }; + return { + closeFunction, + manualTriggerFunction, + }; + } } } diff --git a/packages/nodes-base/nodes/Kafka/test/workflow.json b/packages/nodes-base/nodes/Kafka/test/workflow.json new file mode 100644 index 0000000000..acc5419a06 --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/test/workflow.json @@ -0,0 +1,134 @@ +{ + "name": "Kafka test", + "nodes": [ + { + "parameters": {}, + "type": "n8n-nodes-base.manualTrigger", + "typeVersion": 1, + "position": [0, -100], + "id": "d0594d58-ebb3-4dc0-a241-3f2531212fd7", + "name": "When clicking ‘Test workflow’" + }, + { + "parameters": { + "topic": "test-topic", + "useKey": true, + "key": "messageKey", + "headersUi": { + "headerValues": [ + { + "key": "header", + "value": "value" + } + ] + }, + "options": { + "acks": true, + "compression": true, + "timeout": 1000 + } + }, + "type": "n8n-nodes-base.kafka", + "typeVersion": 1, + "position": [440, -200], + "id": "f29d6af7-9ded-421a-8ada-cea80eac9464", + "name": "Send Input Data", + "credentials": { + "kafka": { + "id": "JJBjHkOrIfcj91EX", + "name": "Kafka account" + } + } + }, + { + "parameters": { + "topic": "test-topic", + "sendInputData": false, + "message": "={{ JSON.stringify({foo: 'bar'}) }}", + "jsonParameters": true, + "useSchemaRegistry": true, + "schemaRegistryUrl": "https://test-kafka-registry.local", + "eventName": "test-event-name", + "headerParametersJson": "{\n \"headerKey\": \"headerValue\"\n}", + "options": {} + }, + "type": "n8n-nodes-base.kafka", + "typeVersion": 1, + "position": [440, 0], + "id": "d851834f-6b97-445d-8e69-cc2e873bdf80", + "name": "Schema Registry", + "credentials": { + "kafka": { + "id": "JJBjHkOrIfcj91EX", + "name": "Kafka account" + } + } + }, + { + "parameters": { + "jsCode": "return [\n {\n \"name\": \"First item\",\n \"code\": 1\n },\n {\n \"name\": \"Second item\",\n \"code\": 2\n }\n]" + }, + "type": "n8n-nodes-base.code", + "typeVersion": 2, + "position": [220, -100], + "id": "50ce815c-cf9a-4d83-8739-c95f9c3d7ec6", + "name": "Test Data" + } + ], + "pinData": { + "Send Input Data": [ + { + "json": { + "success": true + } + } + ], + "Schema Registry": [ + { + "json": { + "success": true + } + } + ] + }, + "connections": { + "When clicking ‘Test workflow’": { + "main": [ + [ + { + "node": "Test Data", + "type": "main", + "index": 0 + } + ] + ] + }, + "Test Data": { + "main": [ + [ + { + "node": "Schema Registry", + "type": "main", + "index": 0 + }, + { + "node": "Send Input Data", + "type": "main", + "index": 0 + } + ] + ] + } + }, + "active": false, + "settings": { + "executionOrder": "v1" + }, + "versionId": "be4cbb16-225f-41ed-b897-895aaa34ea34", + "meta": { + "templateCredsSetupCompleted": true, + "instanceId": "27cc9b56542ad45b38725555722c50a1c3fee1670bbb67980558314ee08517c4" + }, + "id": "r7XhZVcfhaGvCbgE", + "tags": [] +} diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 8700afb86a..d348beb770 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -865,7 +865,7 @@ }, "dependencies": { "@aws-sdk/client-sso-oidc": "3.666.0", - "@kafkajs/confluent-schema-registry": "1.0.6", + "@kafkajs/confluent-schema-registry": "3.8.0", "@n8n/config": "workspace:*", "@n8n/di": "workspace:*", "@n8n/imap": "workspace:*", @@ -893,7 +893,7 @@ "iso-639-1": "2.1.15", "js-nacl": "1.4.0", "jsonwebtoken": "9.0.2", - "kafkajs": "1.16.0", + "kafkajs": "2.2.4", "ldapts": "4.2.6", "lodash": "catalog:", "lossless-json": "1.0.5", diff --git a/packages/nodes-base/test/nodes/TriggerHelpers.ts b/packages/nodes-base/test/nodes/TriggerHelpers.ts index ce31937992..32cfabf686 100644 --- a/packages/nodes-base/test/nodes/TriggerHelpers.ts +++ b/packages/nodes-base/test/nodes/TriggerHelpers.ts @@ -6,20 +6,21 @@ import set from 'lodash/set'; import { PollContext, returnJsonArray } from 'n8n-core'; import type { InstanceSettings, ExecutionLifecycleHooks } from 'n8n-core'; import { ScheduledTaskManager } from 'n8n-core/dist/execution-engine/scheduled-task-manager'; -import type { - IBinaryData, - ICredentialDataDecryptedObject, - IDataObject, - IHttpRequestOptions, - INode, - INodeType, - INodeTypes, - ITriggerFunctions, - IWebhookFunctions, - IWorkflowExecuteAdditionalData, - NodeTypeAndVersion, - VersionedNodeType, - Workflow, +import { + createDeferredPromise, + type IBinaryData, + type ICredentialDataDecryptedObject, + type IDataObject, + type IHttpRequestOptions, + type INode, + type INodeType, + type INodeTypes, + type ITriggerFunctions, + type IWebhookFunctions, + type IWorkflowExecuteAdditionalData, + type NodeTypeAndVersion, + type VersionedNodeType, + type Workflow, } from 'n8n-workflow'; type MockDeepPartial = Parameters>[0]; @@ -75,6 +76,7 @@ export async function testTriggerNode( const scheduledTaskManager = new ScheduledTaskManager(mock()); const helpers = mock({ + createDeferredPromise, returnJsonArray, registerCron: (cronExpression, onTick) => scheduledTaskManager.registerCron(workflow, cronExpression, onTick), @@ -85,6 +87,8 @@ export async function testTriggerNode( emit, getTimezone: () => timezone, getNode: () => node, + getCredentials: async () => + (options.credential ?? {}) as T, getMode: () => options.mode ?? 'trigger', getWorkflowStaticData: () => options.workflowStaticData ?? {}, getNodeParameter: (parameterName, fallback) => get(node.parameters, parameterName) ?? fallback, @@ -95,8 +99,6 @@ export async function testTriggerNode( if (options.mode === 'manual') { expect(response?.manualTriggerFunction).toBeInstanceOf(Function); await response?.manualTriggerFunction?.(); - } else { - expect(response?.manualTriggerFunction).toBeUndefined(); } return { @@ -164,6 +166,8 @@ export async function testWebhookTriggerNode( getWorkflowStaticData: () => options.workflowStaticData ?? {}, getNodeParameter: (parameterName, fallback) => get(node.parameters, parameterName) ?? fallback, getChildNodes: () => options.childNodes ?? [], + getCredentials: async () => + (options.credential ?? {}) as T, }); const responseData = await trigger.webhook?.call(webhookFunctions); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 64c42c80e0..a6f84fc806 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1943,8 +1943,8 @@ importers: specifier: 3.666.0 version: 3.666.0(@aws-sdk/client-sts@3.666.0) '@kafkajs/confluent-schema-registry': - specifier: 1.0.6 - version: 1.0.6 + specifier: 3.8.0 + version: 3.8.0 '@n8n/config': specifier: workspace:* version: link:../@n8n/config @@ -2027,8 +2027,8 @@ importers: specifier: 9.0.2 version: 9.0.2 kafkajs: - specifier: 1.16.0 - version: 1.16.0 + specifier: 2.2.4 + version: 2.2.4 ldapts: specifier: 4.2.6 version: 4.2.6 @@ -3973,8 +3973,8 @@ packages: '@jsdevtools/ono@7.1.3': resolution: {integrity: sha512-4JQNk+3mVzK3xh2rqd6RB4J46qUR19azEHBneZyTZM+c456qOrbbM/5xcR8huNCCcbVt7+UmizG6GuUvPvKUYg==} - '@kafkajs/confluent-schema-registry@1.0.6': - resolution: {integrity: sha512-NrZL1peOIlmlLKvheQcJAx9PHdnc4kaW+9+Yt4jXUfbbYR9EFNCZt6yApI4SwlFilaiZieReM6XslWy1LZAvoQ==} + '@kafkajs/confluent-schema-registry@3.8.0': + resolution: {integrity: sha512-33iCTcNofWznLAy9YcfPmUVoArTzRHUOl+s79Br3+rRvwtNqRueIRBrPwGuA4tYA24VHux77qekSy0yNTHVoeA==} '@kurkle/color@0.3.2': resolution: {integrity: sha512-fuscdXJ9G1qb7W8VdHi+IwRqij3lBkosAm4ydQtEmbY58OzHXqQhvlxqEkoz0yssNVn38bcpRWgA9PP+OGoisw==} @@ -6672,6 +6672,9 @@ packages: ajv@6.12.6: resolution: {integrity: sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==} + ajv@7.2.4: + resolution: {integrity: sha512-nBeQgg/ZZA3u3SYxyaDvpvDtgZ/EZPF547ARgZBrG9Bhu1vKDwAIjtIf+sDtJUKa2zOcEbmRLBRSyMraS/Oy1A==} + ajv@8.12.0: resolution: {integrity: sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==} @@ -9850,9 +9853,9 @@ packages: jws@4.0.0: resolution: {integrity: sha512-KDncfTmOZoOMTFG4mBlG0qUIOlc03fmzH+ru6RgYVZhPkyiy/92Owlt/8UEN+a4TXR1FQetfIpJE8ApdvdVxTg==} - kafkajs@1.16.0: - resolution: {integrity: sha512-+Rcfu2hyQ/jv5skqRY8xA7Ra+mmRkDAzCaLDYbkGtgsNKpzxPWiLbk8ub0dgr4EbWrN1Zb4BCXHUkD6+zYfdWg==} - engines: {node: '>=10.13.0'} + kafkajs@2.2.4: + resolution: {integrity: sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==} + engines: {node: '>=14.0.0'} kleur@3.0.3: resolution: {integrity: sha512-eTIzlVOSUR+JxdDFepEYcBMtZ9Qqdef+rnzWdRZuMbOywu5tO2w2N7rqjoANZ5k9vywhL6Br1VRjUIgTQx4E8w==} @@ -10262,8 +10265,8 @@ packages: map-stream@0.1.0: resolution: {integrity: sha512-CkYQrPYZfWnu/DAmVCpTSX/xHpKZ80eKh2lAkyA6AJTef6bW+6JpbQZN5rofum7da+SyN1bi5ctTm+lTfcCW3g==} - mappersmith@2.43.4: - resolution: {integrity: sha512-IyUw53aE3/SPH3eOkqSuD+Hcstpcl4dpxDDgZsPz65R2SlOikq0VHxo3kMPzUVvw7cCHunTmlpNXl5n/KzPcpg==} + mappersmith@2.45.0: + resolution: {integrity: sha512-N/Kkx9RqJenkvMHPMY0VS1geAara0VQTwup5Abv2GB19QBT7w+epjhRQMLW5jtz2DXUdkh7KD3F3prqJKG1A8w==} mark.js@8.11.1: resolution: {integrity: sha512-1I+1qpDt4idfgLQG+BNWmrqku+7/2bi5nLf4YwF8y8zXvmfiTBY3PV3ZibfrjBueCByROpuBjLLFCajqkgYoLQ==} @@ -11565,6 +11568,10 @@ packages: resolution: {integrity: sha512-YWD03n3shzV9ImZRX3ccbjqLxj7NokGN0V/ESiBV5xWqrommYHYiihuIyavq03pWSGqlyvYUFmfoMKd+1rPA/g==} engines: {node: '>=12.0.0'} + protobufjs@7.4.0: + resolution: {integrity: sha512-mRUWCc3KUU4w1jU8sGxICXH/gNS94DvI1gxqDvBzhj1JpcsimQkYiOJfwsPUykUI5ZaspFbSgmBLER8IrQ3tqw==} + engines: {node: '>=12.0.0'} + proxy-addr@2.0.7: resolution: {integrity: sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==} engines: {node: '>= 0.10'} @@ -16445,10 +16452,12 @@ snapshots: '@jsdevtools/ono@7.1.3': {} - '@kafkajs/confluent-schema-registry@1.0.6': + '@kafkajs/confluent-schema-registry@3.8.0': dependencies: + ajv: 7.2.4 avsc: 5.7.6 - mappersmith: 2.43.4 + mappersmith: 2.45.0 + protobufjs: 7.4.0 '@kurkle/color@0.3.2': {} @@ -19596,6 +19605,13 @@ snapshots: json-schema-traverse: 0.4.1 uri-js: 4.4.1 + ajv@7.2.4: + dependencies: + fast-deep-equal: 3.1.3 + json-schema-traverse: 1.0.0 + require-from-string: 2.0.2 + uri-js: 4.4.1 + ajv@8.12.0: dependencies: fast-deep-equal: 3.1.3 @@ -23626,7 +23642,7 @@ snapshots: jwa: 2.0.0 safe-buffer: 5.2.1 - kafkajs@1.16.0: {} + kafkajs@2.2.4: {} kleur@3.0.3: {} @@ -24028,7 +24044,7 @@ snapshots: map-stream@0.1.0: {} - mappersmith@2.43.4: {} + mappersmith@2.45.0: {} mark.js@8.11.1: {} @@ -25539,6 +25555,21 @@ snapshots: '@types/node': 18.16.16 long: 5.2.3 + protobufjs@7.4.0: + dependencies: + '@protobufjs/aspromise': 1.1.2 + '@protobufjs/base64': 1.1.2 + '@protobufjs/codegen': 2.0.4 + '@protobufjs/eventemitter': 1.1.0 + '@protobufjs/fetch': 1.1.0 + '@protobufjs/float': 1.0.2 + '@protobufjs/inquire': 1.1.0 + '@protobufjs/path': 1.1.2 + '@protobufjs/pool': 1.1.0 + '@protobufjs/utf8': 1.1.0 + '@types/node': 18.16.16 + long: 5.2.3 + proxy-addr@2.0.7: dependencies: forwarded: 0.2.0