diff --git a/packages/nodes-base/nodes/Merge/Merge.node.ts b/packages/nodes-base/nodes/Merge/Merge.node.ts index 8dcfc71c03..dee9932318 100644 --- a/packages/nodes-base/nodes/Merge/Merge.node.ts +++ b/packages/nodes-base/nodes/Merge/Merge.node.ts @@ -14,7 +14,7 @@ export class Merge extends VersionedNodeType { group: ['transform'], subtitle: '={{$parameter["mode"]}}', description: 'Merges data of multiple streams once data from both is available', - defaultVersion: 3, + defaultVersion: 3.1, }; const nodeVersions: IVersionedNodeType['nodeVersions'] = { @@ -22,6 +22,7 @@ export class Merge extends VersionedNodeType { 2: new MergeV2(baseDescription), 2.1: new MergeV2(baseDescription), 3: new MergeV3(baseDescription), + 3.1: new MergeV3(baseDescription), }; super(nodeVersions, baseDescription); diff --git a/packages/nodes-base/nodes/Merge/test/v3/operations.test.ts b/packages/nodes-base/nodes/Merge/test/v3/operations.test.ts index a318885edc..21f97e0ee2 100644 --- a/packages/nodes-base/nodes/Merge/test/v3/operations.test.ts +++ b/packages/nodes-base/nodes/Merge/test/v3/operations.test.ts @@ -259,6 +259,142 @@ describe('Test MergeV3, combineBySql operation', () => { country: 'PL', }); }); + + it('should collect pairedItems data, if version >= 3.1 and SELECT query', async () => { + const nodeParameters: IDataObject = { + operation: 'combineBySql', + query: 'SELECT name FROM input1 LEFT JOIN input2 ON input1.id = input2.id', + }; + + const inputs = [ + [ + { + json: { + id: 1, + data: 'a', + name: 'Sam', + }, + pairedItem: { + item: 0, + input: undefined, + }, + }, + { + json: { + id: 2, + data: 'b', + name: 'Dan', + }, + pairedItem: { + item: 1, + input: undefined, + }, + }, + { + json: { + id: 3, + data: 'c', + name: 'Jon', + }, + pairedItem: { + item: 2, + input: undefined, + }, + }, + ], + [ + { + json: { + id: 1, + data: 'aa', + country: 'PL', + }, + pairedItem: { + item: 0, + input: 1, + }, + }, + { + json: { + id: 2, + data: 'bb', + country: 'FR', + }, + pairedItem: { + item: 1, + input: 1, + }, + }, + { + json: { + id: 3, + data: 'cc', + country: 'UA', + }, + pairedItem: { + item: 2, + input: 1, + }, + }, + ], + ]; + + const returnData = await mode.combineBySql.execute.call( + createMockExecuteFunction(nodeParameters, { ...node, typeVersion: 3.1 }), + inputs, + ); + + expect(returnData.length).toEqual(1); + expect(returnData).toEqual([ + [ + { + json: { + name: 'Sam', + }, + pairedItem: [ + { + item: 0, + input: undefined, + }, + { + item: 0, + input: 1, + }, + ], + }, + { + json: { + name: 'Dan', + }, + pairedItem: [ + { + item: 1, + input: undefined, + }, + { + item: 1, + input: 1, + }, + ], + }, + { + json: { + name: 'Jon', + }, + pairedItem: [ + { + item: 2, + input: undefined, + }, + { + item: 2, + input: 1, + }, + ], + }, + ], + ]); + }); }); describe('Test MergeV3, append operation', () => { diff --git a/packages/nodes-base/nodes/Merge/test/v3/utils.test.ts b/packages/nodes-base/nodes/Merge/test/v3/utils.test.ts new file mode 100644 index 0000000000..ac08ac9bf9 --- /dev/null +++ b/packages/nodes-base/nodes/Merge/test/v3/utils.test.ts @@ -0,0 +1,92 @@ +import type { IDataObject, INodeExecutionData, IPairedItemData } from 'n8n-workflow'; + +import { modifySelectQuery, rowToExecutionData } from '../../v3/helpers/utils'; + +describe('rowToExecutionData', () => { + test('should return empty json and pairedItem when input is empty', () => { + const input: IDataObject = {}; + const result = rowToExecutionData(input); + expect(result).toEqual({ json: {}, pairedItem: [] }); + }); + + test('should separate json properties and pairedItem properties', () => { + const input: IDataObject = { + key1: 'value1', + key2: 42, + pairedItem1: { item: 0, input: undefined } as IPairedItemData, + pairedItem2: { item: 0, input: 1 } as IPairedItemData, + }; + + const expectedOutput: INodeExecutionData = { + json: { key1: 'value1', key2: 42 }, + pairedItem: [ + { item: 0, input: undefined }, + { item: 0, input: 1 }, + ], + }; + + expect(rowToExecutionData(input)).toEqual(expectedOutput); + }); + + test('should ignore undefined pairedItem values', () => { + const input: IDataObject = { + key: 'value', + pairedItem1: { item: 0, input: undefined } as IPairedItemData, + pairedItem2: undefined, + }; + + const expectedOutput: INodeExecutionData = { + json: { key: 'value' }, + pairedItem: [{ item: 0, input: undefined }], + }; + + expect(rowToExecutionData(input)).toEqual(expectedOutput); + }); + + test('should handle only json properties without pairedItem', () => { + const input: IDataObject = { + name: 'Alice', + age: 30, + }; + + const expectedOutput: INodeExecutionData = { + json: { name: 'Alice', age: 30 }, + pairedItem: [], + }; + + expect(rowToExecutionData(input)).toEqual(expectedOutput); + }); +}); + +describe('modifySelectQuery', () => { + test('should return the original query if no SELECT match is found', () => { + const query = 'UPDATE table SET column = 1'; + expect(modifySelectQuery(query, 2)).toBe(query); + }); + + test('should return the original query if SELECT * is used', () => { + const query = 'SELECT * FROM input1'; + expect(modifySelectQuery(query, 2)).toBe(query); + }); + + test('should append pairedItem columns when input tables exist', () => { + const query = 'SELECT column1, column2 FROM input1 WHERE input1.id = table.id'; + const modifiedQuery = modifySelectQuery(query, 2); + expect(modifiedQuery).toBe( + 'SELECT column1, column2, input1.pairedItem AS pairedItem1 FROM input1 WHERE input1.id = table.id', + ); + }); + + test('should handle multiple input tables correctly', () => { + const query = 'SELECT column1 FROM input1 LEFT JOIN input2 ON input1.name = input2.name'; + const modifiedQuery = modifySelectQuery(query, 2); + expect(modifiedQuery).toBe( + 'SELECT column1, input1.pairedItem AS pairedItem1, input2.pairedItem AS pairedItem2 FROM input1 LEFT JOIN input2 ON input1.name = input2.name', + ); + }); + + test('should not modify query if no input tables are found', () => { + const query = 'SELECT column1 FROM table'; + expect(modifySelectQuery(query, 2)).toBe(query); + }); +}); diff --git a/packages/nodes-base/nodes/Merge/v3/actions/mode/combineBySql.ts b/packages/nodes-base/nodes/Merge/v3/actions/mode/combineBySql.ts index bd6921082f..c53d5c4c58 100644 --- a/packages/nodes-base/nodes/Merge/v3/actions/mode/combineBySql.ts +++ b/packages/nodes-base/nodes/Merge/v3/actions/mode/combineBySql.ts @@ -1,8 +1,11 @@ +import { Container } from '@n8n/di'; import alasql from 'alasql'; import type { Database } from 'alasql'; +import { ErrorReporter } from 'n8n-core'; import type { IDataObject, IExecuteFunctions, + INode, INodeExecutionData, INodeProperties, IPairedItemData, @@ -12,6 +15,7 @@ import { NodeOperationError } from 'n8n-workflow'; import { getResolvables, updateDisplayOptions } from '@utils/utilities'; import { numberInputsProperty } from '../../helpers/descriptions'; +import { modifySelectQuery, rowToExecutionData } from '../../helpers/utils'; export const properties: INodeProperties[] = [ numberInputsProperty, @@ -39,15 +43,102 @@ const displayOptions = { export const description = updateDisplayOptions(displayOptions, properties); +const prepareError = (node: INode, error: Error) => { + let message = ''; + if (typeof error === 'string') { + message = error; + } else { + message = error.message; + } + throw new NodeOperationError(node, error, { + message: 'Issue while executing query', + description: message, + itemIndex: 0, + }); +}; + +async function executeSelectWithMappedPairedItems( + node: INode, + inputsData: INodeExecutionData[][], + query: string, +): Promise { + const returnData: INodeExecutionData[] = []; + + const db: typeof Database = new (alasql as any).Database(node.id); + + try { + for (let i = 0; i < inputsData.length; i++) { + const inputData = inputsData[i]; + + db.exec(`CREATE TABLE input${i + 1}`); + db.tables[`input${i + 1}`].data = inputData.map((entry) => ({ + ...entry.json, + pairedItem: entry.pairedItem, + })); + } + } catch (error) { + throw new NodeOperationError(node, error, { + message: 'Issue while creating table from', + description: error.message, + itemIndex: 0, + }); + } + + try { + const result: IDataObject[] = db.exec(modifySelectQuery(query, inputsData.length)); + + for (const item of result) { + if (Array.isArray(item)) { + returnData.push(...item.map((entry) => rowToExecutionData(entry))); + } else if (typeof item === 'object') { + returnData.push(rowToExecutionData(item)); + } + } + + if (!returnData.length) { + returnData.push({ json: { success: true } }); + } + } catch (error) { + prepareError(node, error as Error); + } finally { + delete alasql.databases[node.id]; + } + + return [returnData]; +} + export async function execute( this: IExecuteFunctions, inputsData: INodeExecutionData[][], ): Promise { - const nodeId = this.getNode().id; + const node = this.getNode(); const returnData: INodeExecutionData[] = []; const pairedItem: IPairedItemData[] = []; - const db: typeof Database = new (alasql as any).Database(nodeId); + let query = this.getNodeParameter('query', 0) as string; + + for (const resolvable of getResolvables(query)) { + query = query.replace(resolvable, this.evaluateExpression(resolvable, 0) as string); + } + + const isSelectQuery = node.typeVersion >= 3.1 ? query.toLowerCase().startsWith('select') : false; + + if (isSelectQuery) { + try { + return await executeSelectWithMappedPairedItems(node, inputsData, query); + } catch (error) { + Container.get(ErrorReporter).error(error, { + extra: { + nodeName: node.name, + nodeType: node.type, + nodeVersion: node.typeVersion, + workflowId: this.getWorkflow().id, + }, + }); + } + } + + const db: typeof Database = new (alasql as any).Database(node.id); try { for (let i = 0; i < inputsData.length; i++) { @@ -90,7 +181,7 @@ export async function execute( db.tables[`input${i + 1}`].data = inputData.map((entry) => entry.json); } } catch (error) { - throw new NodeOperationError(this.getNode(), error, { + throw new NodeOperationError(node, error, { message: 'Issue while creating table from', description: error.message, itemIndex: 0, @@ -98,12 +189,6 @@ export async function execute( } try { - let query = this.getNodeParameter('query', 0) as string; - - for (const resolvable of getResolvables(query)) { - query = query.replace(resolvable, this.evaluateExpression(resolvable, 0) as string); - } - const result: IDataObject[] = db.exec(query); for (const item of result) { @@ -118,20 +203,10 @@ export async function execute( returnData.push({ json: { success: true }, pairedItem }); } } catch (error) { - let message = ''; - if (typeof error === 'string') { - message = error; - } else { - message = error.message; - } - throw new NodeOperationError(this.getNode(), error, { - message: 'Issue while executing query', - description: message, - itemIndex: 0, - }); + prepareError(node, error as Error); + } finally { + delete alasql.databases[node.id]; } - delete alasql.databases[nodeId]; - return [returnData]; } diff --git a/packages/nodes-base/nodes/Merge/v3/actions/versionDescription.ts b/packages/nodes-base/nodes/Merge/v3/actions/versionDescription.ts index 788b45446d..501d6063e1 100644 --- a/packages/nodes-base/nodes/Merge/v3/actions/versionDescription.ts +++ b/packages/nodes-base/nodes/Merge/v3/actions/versionDescription.ts @@ -9,7 +9,7 @@ export const versionDescription: INodeTypeDescription = { name: 'merge', group: ['transform'], description: 'Merges data of multiple streams once data from both is available', - version: [3], + version: [3, 3.1], defaults: { name: 'Merge', }, diff --git a/packages/nodes-base/nodes/Merge/v3/helpers/utils.ts b/packages/nodes-base/nodes/Merge/v3/helpers/utils.ts index a8d44ce550..390c46558e 100644 --- a/packages/nodes-base/nodes/Merge/v3/helpers/utils.ts +++ b/packages/nodes-base/nodes/Merge/v3/helpers/utils.ts @@ -386,3 +386,43 @@ export function getNodeInputsData(this: IExecuteFunctions) { return returnData; } + +export const rowToExecutionData = (data: IDataObject): INodeExecutionData => { + const keys = Object.keys(data); + const pairedItem: IPairedItemData[] = []; + const json: IDataObject = {}; + + for (const key of keys) { + if (key.startsWith('pairedItem')) { + if (data[key] === undefined) continue; + pairedItem.push(data[key] as IPairedItemData); + } else { + json[key] = data[key]; + } + } + + return { json, pairedItem }; +}; + +export function modifySelectQuery(userQuery: string, inputLength: number): string { + const selectMatch = userQuery.match(/SELECT\s+(.+?)\s+FROM/i); + if (!selectMatch) return userQuery; + + let selectedColumns = selectMatch[1].trim(); + + if (selectedColumns === '*') { + return userQuery; + } + + const pairedItemColumns = []; + + for (let i = 1; i <= inputLength; i++) { + if (userQuery.includes(`input${i}`)) { + pairedItemColumns.push(`input${i}.pairedItem AS pairedItem${i}`); + } + } + + selectedColumns += pairedItemColumns.length ? ', ' + pairedItemColumns.join(', ') : ''; + + return userQuery.replace(selectMatch[0], `SELECT ${selectedColumns} FROM`); +}