mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 18:12:04 +00:00
feat(Merge Node): Better pairedItem mapping in combineBySql operation if SELECT query (#13849)
Co-authored-by: Shireen Missi <94372015+ShireenMissi@users.noreply.github.com>
This commit is contained in:
@@ -14,7 +14,7 @@ export class Merge extends VersionedNodeType {
|
|||||||
group: ['transform'],
|
group: ['transform'],
|
||||||
subtitle: '={{$parameter["mode"]}}',
|
subtitle: '={{$parameter["mode"]}}',
|
||||||
description: 'Merges data of multiple streams once data from both is available',
|
description: 'Merges data of multiple streams once data from both is available',
|
||||||
defaultVersion: 3,
|
defaultVersion: 3.1,
|
||||||
};
|
};
|
||||||
|
|
||||||
const nodeVersions: IVersionedNodeType['nodeVersions'] = {
|
const nodeVersions: IVersionedNodeType['nodeVersions'] = {
|
||||||
@@ -22,6 +22,7 @@ export class Merge extends VersionedNodeType {
|
|||||||
2: new MergeV2(baseDescription),
|
2: new MergeV2(baseDescription),
|
||||||
2.1: new MergeV2(baseDescription),
|
2.1: new MergeV2(baseDescription),
|
||||||
3: new MergeV3(baseDescription),
|
3: new MergeV3(baseDescription),
|
||||||
|
3.1: new MergeV3(baseDescription),
|
||||||
};
|
};
|
||||||
|
|
||||||
super(nodeVersions, baseDescription);
|
super(nodeVersions, baseDescription);
|
||||||
|
|||||||
@@ -259,6 +259,142 @@ describe('Test MergeV3, combineBySql operation', () => {
|
|||||||
country: 'PL',
|
country: 'PL',
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should collect pairedItems data, if version >= 3.1 and SELECT query', async () => {
|
||||||
|
const nodeParameters: IDataObject = {
|
||||||
|
operation: 'combineBySql',
|
||||||
|
query: 'SELECT name FROM input1 LEFT JOIN input2 ON input1.id = input2.id',
|
||||||
|
};
|
||||||
|
|
||||||
|
const inputs = [
|
||||||
|
[
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
id: 1,
|
||||||
|
data: 'a',
|
||||||
|
name: 'Sam',
|
||||||
|
},
|
||||||
|
pairedItem: {
|
||||||
|
item: 0,
|
||||||
|
input: undefined,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
id: 2,
|
||||||
|
data: 'b',
|
||||||
|
name: 'Dan',
|
||||||
|
},
|
||||||
|
pairedItem: {
|
||||||
|
item: 1,
|
||||||
|
input: undefined,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
id: 3,
|
||||||
|
data: 'c',
|
||||||
|
name: 'Jon',
|
||||||
|
},
|
||||||
|
pairedItem: {
|
||||||
|
item: 2,
|
||||||
|
input: undefined,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
id: 1,
|
||||||
|
data: 'aa',
|
||||||
|
country: 'PL',
|
||||||
|
},
|
||||||
|
pairedItem: {
|
||||||
|
item: 0,
|
||||||
|
input: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
id: 2,
|
||||||
|
data: 'bb',
|
||||||
|
country: 'FR',
|
||||||
|
},
|
||||||
|
pairedItem: {
|
||||||
|
item: 1,
|
||||||
|
input: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
id: 3,
|
||||||
|
data: 'cc',
|
||||||
|
country: 'UA',
|
||||||
|
},
|
||||||
|
pairedItem: {
|
||||||
|
item: 2,
|
||||||
|
input: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
const returnData = await mode.combineBySql.execute.call(
|
||||||
|
createMockExecuteFunction(nodeParameters, { ...node, typeVersion: 3.1 }),
|
||||||
|
inputs,
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(returnData.length).toEqual(1);
|
||||||
|
expect(returnData).toEqual([
|
||||||
|
[
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
name: 'Sam',
|
||||||
|
},
|
||||||
|
pairedItem: [
|
||||||
|
{
|
||||||
|
item: 0,
|
||||||
|
input: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
item: 0,
|
||||||
|
input: 1,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
name: 'Dan',
|
||||||
|
},
|
||||||
|
pairedItem: [
|
||||||
|
{
|
||||||
|
item: 1,
|
||||||
|
input: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
item: 1,
|
||||||
|
input: 1,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
json: {
|
||||||
|
name: 'Jon',
|
||||||
|
},
|
||||||
|
pairedItem: [
|
||||||
|
{
|
||||||
|
item: 2,
|
||||||
|
input: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
item: 2,
|
||||||
|
input: 1,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
]);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Test MergeV3, append operation', () => {
|
describe('Test MergeV3, append operation', () => {
|
||||||
|
|||||||
92
packages/nodes-base/nodes/Merge/test/v3/utils.test.ts
Normal file
92
packages/nodes-base/nodes/Merge/test/v3/utils.test.ts
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
import type { IDataObject, INodeExecutionData, IPairedItemData } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { modifySelectQuery, rowToExecutionData } from '../../v3/helpers/utils';
|
||||||
|
|
||||||
|
describe('rowToExecutionData', () => {
|
||||||
|
test('should return empty json and pairedItem when input is empty', () => {
|
||||||
|
const input: IDataObject = {};
|
||||||
|
const result = rowToExecutionData(input);
|
||||||
|
expect(result).toEqual({ json: {}, pairedItem: [] });
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should separate json properties and pairedItem properties', () => {
|
||||||
|
const input: IDataObject = {
|
||||||
|
key1: 'value1',
|
||||||
|
key2: 42,
|
||||||
|
pairedItem1: { item: 0, input: undefined } as IPairedItemData,
|
||||||
|
pairedItem2: { item: 0, input: 1 } as IPairedItemData,
|
||||||
|
};
|
||||||
|
|
||||||
|
const expectedOutput: INodeExecutionData = {
|
||||||
|
json: { key1: 'value1', key2: 42 },
|
||||||
|
pairedItem: [
|
||||||
|
{ item: 0, input: undefined },
|
||||||
|
{ item: 0, input: 1 },
|
||||||
|
],
|
||||||
|
};
|
||||||
|
|
||||||
|
expect(rowToExecutionData(input)).toEqual(expectedOutput);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should ignore undefined pairedItem values', () => {
|
||||||
|
const input: IDataObject = {
|
||||||
|
key: 'value',
|
||||||
|
pairedItem1: { item: 0, input: undefined } as IPairedItemData,
|
||||||
|
pairedItem2: undefined,
|
||||||
|
};
|
||||||
|
|
||||||
|
const expectedOutput: INodeExecutionData = {
|
||||||
|
json: { key: 'value' },
|
||||||
|
pairedItem: [{ item: 0, input: undefined }],
|
||||||
|
};
|
||||||
|
|
||||||
|
expect(rowToExecutionData(input)).toEqual(expectedOutput);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should handle only json properties without pairedItem', () => {
|
||||||
|
const input: IDataObject = {
|
||||||
|
name: 'Alice',
|
||||||
|
age: 30,
|
||||||
|
};
|
||||||
|
|
||||||
|
const expectedOutput: INodeExecutionData = {
|
||||||
|
json: { name: 'Alice', age: 30 },
|
||||||
|
pairedItem: [],
|
||||||
|
};
|
||||||
|
|
||||||
|
expect(rowToExecutionData(input)).toEqual(expectedOutput);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('modifySelectQuery', () => {
|
||||||
|
test('should return the original query if no SELECT match is found', () => {
|
||||||
|
const query = 'UPDATE table SET column = 1';
|
||||||
|
expect(modifySelectQuery(query, 2)).toBe(query);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should return the original query if SELECT * is used', () => {
|
||||||
|
const query = 'SELECT * FROM input1';
|
||||||
|
expect(modifySelectQuery(query, 2)).toBe(query);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should append pairedItem columns when input tables exist', () => {
|
||||||
|
const query = 'SELECT column1, column2 FROM input1 WHERE input1.id = table.id';
|
||||||
|
const modifiedQuery = modifySelectQuery(query, 2);
|
||||||
|
expect(modifiedQuery).toBe(
|
||||||
|
'SELECT column1, column2, input1.pairedItem AS pairedItem1 FROM input1 WHERE input1.id = table.id',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should handle multiple input tables correctly', () => {
|
||||||
|
const query = 'SELECT column1 FROM input1 LEFT JOIN input2 ON input1.name = input2.name';
|
||||||
|
const modifiedQuery = modifySelectQuery(query, 2);
|
||||||
|
expect(modifiedQuery).toBe(
|
||||||
|
'SELECT column1, input1.pairedItem AS pairedItem1, input2.pairedItem AS pairedItem2 FROM input1 LEFT JOIN input2 ON input1.name = input2.name',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should not modify query if no input tables are found', () => {
|
||||||
|
const query = 'SELECT column1 FROM table';
|
||||||
|
expect(modifySelectQuery(query, 2)).toBe(query);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,8 +1,11 @@
|
|||||||
|
import { Container } from '@n8n/di';
|
||||||
import alasql from 'alasql';
|
import alasql from 'alasql';
|
||||||
import type { Database } from 'alasql';
|
import type { Database } from 'alasql';
|
||||||
|
import { ErrorReporter } from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
IDataObject,
|
IDataObject,
|
||||||
IExecuteFunctions,
|
IExecuteFunctions,
|
||||||
|
INode,
|
||||||
INodeExecutionData,
|
INodeExecutionData,
|
||||||
INodeProperties,
|
INodeProperties,
|
||||||
IPairedItemData,
|
IPairedItemData,
|
||||||
@@ -12,6 +15,7 @@ import { NodeOperationError } from 'n8n-workflow';
|
|||||||
import { getResolvables, updateDisplayOptions } from '@utils/utilities';
|
import { getResolvables, updateDisplayOptions } from '@utils/utilities';
|
||||||
|
|
||||||
import { numberInputsProperty } from '../../helpers/descriptions';
|
import { numberInputsProperty } from '../../helpers/descriptions';
|
||||||
|
import { modifySelectQuery, rowToExecutionData } from '../../helpers/utils';
|
||||||
|
|
||||||
export const properties: INodeProperties[] = [
|
export const properties: INodeProperties[] = [
|
||||||
numberInputsProperty,
|
numberInputsProperty,
|
||||||
@@ -39,15 +43,102 @@ const displayOptions = {
|
|||||||
|
|
||||||
export const description = updateDisplayOptions(displayOptions, properties);
|
export const description = updateDisplayOptions(displayOptions, properties);
|
||||||
|
|
||||||
|
const prepareError = (node: INode, error: Error) => {
|
||||||
|
let message = '';
|
||||||
|
if (typeof error === 'string') {
|
||||||
|
message = error;
|
||||||
|
} else {
|
||||||
|
message = error.message;
|
||||||
|
}
|
||||||
|
throw new NodeOperationError(node, error, {
|
||||||
|
message: 'Issue while executing query',
|
||||||
|
description: message,
|
||||||
|
itemIndex: 0,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
async function executeSelectWithMappedPairedItems(
|
||||||
|
node: INode,
|
||||||
|
inputsData: INodeExecutionData[][],
|
||||||
|
query: string,
|
||||||
|
): Promise<INodeExecutionData[][]> {
|
||||||
|
const returnData: INodeExecutionData[] = [];
|
||||||
|
|
||||||
|
const db: typeof Database = new (alasql as any).Database(node.id);
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (let i = 0; i < inputsData.length; i++) {
|
||||||
|
const inputData = inputsData[i];
|
||||||
|
|
||||||
|
db.exec(`CREATE TABLE input${i + 1}`);
|
||||||
|
db.tables[`input${i + 1}`].data = inputData.map((entry) => ({
|
||||||
|
...entry.json,
|
||||||
|
pairedItem: entry.pairedItem,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
throw new NodeOperationError(node, error, {
|
||||||
|
message: 'Issue while creating table from',
|
||||||
|
description: error.message,
|
||||||
|
itemIndex: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result: IDataObject[] = db.exec(modifySelectQuery(query, inputsData.length));
|
||||||
|
|
||||||
|
for (const item of result) {
|
||||||
|
if (Array.isArray(item)) {
|
||||||
|
returnData.push(...item.map((entry) => rowToExecutionData(entry)));
|
||||||
|
} else if (typeof item === 'object') {
|
||||||
|
returnData.push(rowToExecutionData(item));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!returnData.length) {
|
||||||
|
returnData.push({ json: { success: true } });
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
prepareError(node, error as Error);
|
||||||
|
} finally {
|
||||||
|
delete alasql.databases[node.id];
|
||||||
|
}
|
||||||
|
|
||||||
|
return [returnData];
|
||||||
|
}
|
||||||
|
|
||||||
export async function execute(
|
export async function execute(
|
||||||
this: IExecuteFunctions,
|
this: IExecuteFunctions,
|
||||||
inputsData: INodeExecutionData[][],
|
inputsData: INodeExecutionData[][],
|
||||||
): Promise<INodeExecutionData[][]> {
|
): Promise<INodeExecutionData[][]> {
|
||||||
const nodeId = this.getNode().id;
|
const node = this.getNode();
|
||||||
const returnData: INodeExecutionData[] = [];
|
const returnData: INodeExecutionData[] = [];
|
||||||
const pairedItem: IPairedItemData[] = [];
|
const pairedItem: IPairedItemData[] = [];
|
||||||
|
|
||||||
const db: typeof Database = new (alasql as any).Database(nodeId);
|
let query = this.getNodeParameter('query', 0) as string;
|
||||||
|
|
||||||
|
for (const resolvable of getResolvables(query)) {
|
||||||
|
query = query.replace(resolvable, this.evaluateExpression(resolvable, 0) as string);
|
||||||
|
}
|
||||||
|
|
||||||
|
const isSelectQuery = node.typeVersion >= 3.1 ? query.toLowerCase().startsWith('select') : false;
|
||||||
|
|
||||||
|
if (isSelectQuery) {
|
||||||
|
try {
|
||||||
|
return await executeSelectWithMappedPairedItems(node, inputsData, query);
|
||||||
|
} catch (error) {
|
||||||
|
Container.get(ErrorReporter).error(error, {
|
||||||
|
extra: {
|
||||||
|
nodeName: node.name,
|
||||||
|
nodeType: node.type,
|
||||||
|
nodeVersion: node.typeVersion,
|
||||||
|
workflowId: this.getWorkflow().id,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const db: typeof Database = new (alasql as any).Database(node.id);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (let i = 0; i < inputsData.length; i++) {
|
for (let i = 0; i < inputsData.length; i++) {
|
||||||
@@ -90,7 +181,7 @@ export async function execute(
|
|||||||
db.tables[`input${i + 1}`].data = inputData.map((entry) => entry.json);
|
db.tables[`input${i + 1}`].data = inputData.map((entry) => entry.json);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
throw new NodeOperationError(this.getNode(), error, {
|
throw new NodeOperationError(node, error, {
|
||||||
message: 'Issue while creating table from',
|
message: 'Issue while creating table from',
|
||||||
description: error.message,
|
description: error.message,
|
||||||
itemIndex: 0,
|
itemIndex: 0,
|
||||||
@@ -98,12 +189,6 @@ export async function execute(
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let query = this.getNodeParameter('query', 0) as string;
|
|
||||||
|
|
||||||
for (const resolvable of getResolvables(query)) {
|
|
||||||
query = query.replace(resolvable, this.evaluateExpression(resolvable, 0) as string);
|
|
||||||
}
|
|
||||||
|
|
||||||
const result: IDataObject[] = db.exec(query);
|
const result: IDataObject[] = db.exec(query);
|
||||||
|
|
||||||
for (const item of result) {
|
for (const item of result) {
|
||||||
@@ -118,20 +203,10 @@ export async function execute(
|
|||||||
returnData.push({ json: { success: true }, pairedItem });
|
returnData.push({ json: { success: true }, pairedItem });
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
let message = '';
|
prepareError(node, error as Error);
|
||||||
if (typeof error === 'string') {
|
} finally {
|
||||||
message = error;
|
delete alasql.databases[node.id];
|
||||||
} else {
|
|
||||||
message = error.message;
|
|
||||||
}
|
}
|
||||||
throw new NodeOperationError(this.getNode(), error, {
|
|
||||||
message: 'Issue while executing query',
|
|
||||||
description: message,
|
|
||||||
itemIndex: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
delete alasql.databases[nodeId];
|
|
||||||
|
|
||||||
return [returnData];
|
return [returnData];
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ export const versionDescription: INodeTypeDescription = {
|
|||||||
name: 'merge',
|
name: 'merge',
|
||||||
group: ['transform'],
|
group: ['transform'],
|
||||||
description: 'Merges data of multiple streams once data from both is available',
|
description: 'Merges data of multiple streams once data from both is available',
|
||||||
version: [3],
|
version: [3, 3.1],
|
||||||
defaults: {
|
defaults: {
|
||||||
name: 'Merge',
|
name: 'Merge',
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -386,3 +386,43 @@ export function getNodeInputsData(this: IExecuteFunctions) {
|
|||||||
|
|
||||||
return returnData;
|
return returnData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const rowToExecutionData = (data: IDataObject): INodeExecutionData => {
|
||||||
|
const keys = Object.keys(data);
|
||||||
|
const pairedItem: IPairedItemData[] = [];
|
||||||
|
const json: IDataObject = {};
|
||||||
|
|
||||||
|
for (const key of keys) {
|
||||||
|
if (key.startsWith('pairedItem')) {
|
||||||
|
if (data[key] === undefined) continue;
|
||||||
|
pairedItem.push(data[key] as IPairedItemData);
|
||||||
|
} else {
|
||||||
|
json[key] = data[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { json, pairedItem };
|
||||||
|
};
|
||||||
|
|
||||||
|
export function modifySelectQuery(userQuery: string, inputLength: number): string {
|
||||||
|
const selectMatch = userQuery.match(/SELECT\s+(.+?)\s+FROM/i);
|
||||||
|
if (!selectMatch) return userQuery;
|
||||||
|
|
||||||
|
let selectedColumns = selectMatch[1].trim();
|
||||||
|
|
||||||
|
if (selectedColumns === '*') {
|
||||||
|
return userQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pairedItemColumns = [];
|
||||||
|
|
||||||
|
for (let i = 1; i <= inputLength; i++) {
|
||||||
|
if (userQuery.includes(`input${i}`)) {
|
||||||
|
pairedItemColumns.push(`input${i}.pairedItem AS pairedItem${i}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
selectedColumns += pairedItemColumns.length ? ', ' + pairedItemColumns.join(', ') : '';
|
||||||
|
|
||||||
|
return userQuery.replace(selectMatch[0], `SELECT ${selectedColumns} FROM`);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user