Allow fromBeginning config in kafka trigger node (#1958)

* Allow fromBeginning config in kafka trigger node

* make sure options in defined
This commit is contained in:
Pierre Lanvin
2021-07-14 20:31:43 +02:00
committed by GitHub
parent c983603306
commit 58f0d7cffc

View File

@@ -70,6 +70,13 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Allow sending message to a previously non exisiting topic .',
},
{
displayName: 'Read messages from beginning',
name: 'fromBeginning',
type: 'boolean',
default: true,
description: 'Read message from beginning .',
},
{
displayName: 'JSON Parse Message',
name: 'jsonParseMessage',
@@ -140,13 +147,13 @@ export class KafkaTrigger implements INodeType {
const consumer = kafka.consumer({ groupId });
await consumer.connect();
const options = this.getNodeParameter('options', {}) as IDataObject;
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.subscribe({ topic, fromBeginning: (options.fromBeginning)? true : false });
const self = this;
const options = this.getNodeParameter('options', {}) as IDataObject;
const startConsumer = async () => {
await consumer.run({
eachMessage: async ({ topic, message }) => {