Merge branch 'master' of github.com:rgeorgel/n8n

This commit is contained in:
Ricardo Georgel
2021-07-20 18:26:40 -03:00
183 changed files with 40091 additions and 34042 deletions

View File

@@ -207,126 +207,134 @@ export class Kafka implements INodeType {
let responseData: IDataObject[];
const options = this.getNodeParameter('options', 0) as IDataObject;
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
try {
const options = this.getNodeParameter('options', 0) as IDataObject;
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const timeout = options.timeout as number;
const timeout = options.timeout as number;
let compression = CompressionTypes.None;
let compression = CompressionTypes.None;
const acks = (options.acks === true) ? 1 : 0;
const acks = (options.acks === true) ? 1 : 0;
if (options.compression === true) {
compression = CompressionTypes.GZIP;
}
const credentials = this.getCredentials('kafka') as IDataObject;
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if(!(credentials.username && credentials.password)) {
throw new NodeOperationError(this.getNode(), 'Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
const producer = kafka.producer();
await producer.connect();
let message: string | Buffer;
for (let i = 0; i < length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
if (options.compression === true) {
compression = CompressionTypes.GZIP;
}
if (useSchemaRegistry) {
try {
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const eventName = this.getNodeParameter('eventName', 0) as string;
const credentials = this.getCredentials('kafka') as IDataObject;
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
const id = await registry.getLatestSchemaId(eventName);
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
message = await registry.encode(id, JSON.parse(message));
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration');
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if(!(credentials.username && credentials.password)) {
throw new NodeOperationError(this.getNode(), 'Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const topic = this.getNodeParameter('topic', i) as string;
const kafka = new apacheKafka(config);
const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean;
const producer = kafka.producer();
let headers;
await producer.connect();
if (jsonParameters === true) {
headers = this.getNodeParameter('headerParametersJson', i) as string;
try {
headers = JSON.parse(headers);
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Headers must be a valid json');
let message: string | Buffer;
for (let i = 0; i < length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
} else {
const values = (this.getNodeParameter('headersUi', i) as IDataObject).headerValues as IDataObject[];
headers = {};
if (values !== undefined) {
for (const value of values) {
//@ts-ignore
headers[value.key] = value.value;
if (useSchemaRegistry) {
try {
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const eventName = this.getNodeParameter('eventName', 0) as string;
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
const id = await registry.getLatestSchemaId(eventName);
message = await registry.encode(id, JSON.parse(message));
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration');
}
}
const topic = this.getNodeParameter('topic', i) as string;
const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean;
let headers;
if (jsonParameters === true) {
headers = this.getNodeParameter('headerParametersJson', i) as string;
try {
headers = JSON.parse(headers);
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Headers must be a valid json');
}
} else {
const values = (this.getNodeParameter('headersUi', i) as IDataObject).headerValues as IDataObject[];
headers = {};
if (values !== undefined) {
for (const value of values) {
//@ts-ignore
headers[value.key] = value.value;
}
}
}
topicMessages.push(
{
topic,
messages: [{
value: message,
headers,
}],
});
}
topicMessages.push(
responseData = await producer.sendBatch(
{
topic,
messages: [{
value: message,
headers,
}],
topicMessages,
timeout,
compression,
acks,
},
);
if (responseData.length === 0) {
responseData.push({
success: true,
});
}
await producer.disconnect();
return [this.helpers.returnJsonArray(responseData)];
} catch (error) {
if (this.continueOnFail()) {
return [this.helpers.returnJsonArray({ error: error.message })];
} else {
throw error;
}
}
responseData = await producer.sendBatch(
{
topicMessages,
timeout,
compression,
acks,
},
);
if (responseData.length === 0) {
responseData.push({
success: true,
});
}
await producer.disconnect();
return [this.helpers.returnJsonArray(responseData)];
}
}