From 3111dece72fdefa673de2051aaf894351128e162 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Thu, 7 Nov 2024 11:24:00 +0200 Subject: [PATCH] perf(core): Deduplicate task runner data request response (no-changelog) (#11583) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- .../@n8n/config/src/configs/runners.config.ts | 4 + packages/@n8n/config/test/config.test.ts | 1 + .../data-request-response-reconstruct.ts | 29 ++ packages/@n8n/task-runner/src/index.ts | 1 + .../__tests__/js-task-runner.test.ts | 38 +- .../src/js-task-runner/__tests__/test-data.ts | 29 +- .../__tests__/built-ins-parser.test.ts | 20 +- .../src/js-task-runner/js-task-runner.ts | 114 +++--- packages/@n8n/task-runner/src/runner-types.ts | 4 +- packages/cli/src/runners/runner-types.ts | 12 - .../data-request-response-builder.test.ts | 338 +++--------------- .../data-request-response-stripper.test.ts | 300 ++++++++++++++++ .../data-request-response-builder.ts | 190 ++-------- .../data-request-response-stripper.ts | 131 +++++++ .../src/runners/task-managers/task-manager.ts | 33 +- 15 files changed, 672 insertions(+), 572 deletions(-) create mode 100644 packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts create mode 100644 packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts create mode 100644 packages/cli/src/runners/task-managers/data-request-response-stripper.ts diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index c7be197963..500c3337e9 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -50,4 +50,8 @@ export class TaskRunnersConfig { /** How many concurrent tasks can a runner execute at a time */ @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; + + /** Should the output of deduplication be asserted for correctness */ + @Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT') + assertDeduplicationOutput: boolean = false; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index bc10028f36..71018dee20 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -233,6 +233,7 @@ describe('GlobalConfig', () => { launcherRunner: 'javascript', maxOldSpaceSize: '', maxConcurrency: 5, + assertDeduplicationOutput: false, }, sentry: { backendDsn: '', diff --git a/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts b/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts new file mode 100644 index 0000000000..83a291a491 --- /dev/null +++ b/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts @@ -0,0 +1,29 @@ +import type { IExecuteData, INodeExecutionData } from 'n8n-workflow'; + +import type { DataRequestResponse } from '@/runner-types'; + +/** + * Reconstructs data from a DataRequestResponse to the initial + * data structures. + */ +export class DataRequestResponseReconstruct { + /** + * Reconstructs `connectionInputData` from a DataRequestResponse + */ + reconstructConnectionInputData( + inputData: DataRequestResponse['inputData'], + ): INodeExecutionData[] { + return inputData?.main?.[0] ?? []; + } + + /** + * Reconstruct `executeData` from a DataRequestResponse + */ + reconstructExecuteData(response: DataRequestResponse): IExecuteData { + return { + data: response.inputData, + node: response.node, + source: response.connectionInputSource, + }; + } +} diff --git a/packages/@n8n/task-runner/src/index.ts b/packages/@n8n/task-runner/src/index.ts index bc770ea08e..5fcc6e078b 100644 --- a/packages/@n8n/task-runner/src/index.ts +++ b/packages/@n8n/task-runner/src/index.ts @@ -1,3 +1,4 @@ export * from './task-runner'; export * from './runner-types'; export * from './message-types'; +export * from './data-request/data-request-response-reconstruct'; diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index cd0863b13e..621a9c81a7 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -3,15 +3,21 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow'; import fs from 'node:fs'; import { builtinModules } from 'node:module'; +import type { JsRunnerConfig } from '@/config/js-runner-config'; +import { MainConfig } from '@/config/main-config'; +import { ExecutionError } from '@/js-task-runner/errors/execution-error'; import { ValidationError } from '@/js-task-runner/errors/validation-error'; -import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; +import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner } from '@/js-task-runner/js-task-runner'; +import type { DataRequestResponse } from '@/runner-types'; import type { Task } from '@/task-runner'; -import { newCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data'; -import type { JsRunnerConfig } from '../../config/js-runner-config'; -import { MainConfig } from '../../config/main-config'; -import { ExecutionError } from '../errors/execution-error'; +import { + newDataRequestResponse, + newTaskWithSettings, + withPairedItem, + wrapIntoJson, +} from './test-data'; jest.mock('ws'); @@ -68,7 +74,7 @@ describe('JsTaskRunner', () => { nodeMode: 'runOnceForAllItems', ...settings, }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)), runner, }); }; @@ -91,7 +97,7 @@ describe('JsTaskRunner', () => { nodeMode: 'runOnceForEachItem', ...settings, }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)), runner, }); }; @@ -108,7 +114,7 @@ describe('JsTaskRunner', () => { await execTaskWithParams({ task, - taskData: newCodeTaskData([wrapIntoJson({})]), + taskData: newDataRequestResponse([wrapIntoJson({})]), }); expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [ @@ -243,7 +249,7 @@ describe('JsTaskRunner', () => { code: 'return { val: $env.VAR1 }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: { isEnvAccessBlocked: false, isProcessAvailable: true, @@ -262,7 +268,7 @@ describe('JsTaskRunner', () => { code: 'return { val: $env.VAR1 }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: { isEnvAccessBlocked: true, isProcessAvailable: true, @@ -279,7 +285,7 @@ describe('JsTaskRunner', () => { code: 'return Object.values($env).concat(Object.keys($env))', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: { isEnvAccessBlocked: false, isProcessAvailable: true, @@ -298,7 +304,7 @@ describe('JsTaskRunner', () => { code: 'return { val: $env.N8N_RUNNERS_N8N_URI }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: undefined, }), }); @@ -313,7 +319,7 @@ describe('JsTaskRunner', () => { code: 'return { val: Buffer.from("test-buffer").toString() }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: undefined, }), }); @@ -325,7 +331,7 @@ describe('JsTaskRunner', () => { code: 'return { val: Buffer.from("test-buffer").toString() }', nodeMode: 'runOnceForEachItem', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: undefined, }), }); @@ -771,7 +777,7 @@ describe('JsTaskRunner', () => { code: 'unknown', nodeMode, }), - taskData: newCodeTaskData([wrapIntoJson({ a: 1 })]), + taskData: newDataRequestResponse([wrapIntoJson({ a: 1 })]), }), ).rejects.toThrow(ExecutionError); }, @@ -793,7 +799,7 @@ describe('JsTaskRunner', () => { jest.spyOn(runner, 'sendOffers').mockImplementation(() => {}); jest .spyOn(runner, 'requestData') - .mockResolvedValue(newCodeTaskData([wrapIntoJson({ a: 1 })])); + .mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })])); await runner.receivedSettings(taskId, task.settings); diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts index 6de3e6d2b1..224f630807 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts @@ -2,7 +2,8 @@ import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-work import { NodeConnectionType } from 'n8n-workflow'; import { nanoid } from 'nanoid'; -import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; +import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; +import type { DataRequestResponse } from '@/runner-types'; import type { Task } from '@/task-runner'; /** @@ -46,10 +47,10 @@ export const newTaskData = (opts: Partial & Pick }); /** - * Creates a new all code task data with the given options + * Creates a new data request response with the given options */ -export const newCodeTaskData = ( - codeNodeInputData: INodeExecutionData[], +export const newDataRequestResponse = ( + inputData: INodeExecutionData[], opts: Partial = {}, ): DataRequestResponse => { const codeNode = newNode({ @@ -83,9 +84,8 @@ export const newCodeTaskData = ( nodes: [manualTriggerNode, codeNode], }, inputData: { - main: [codeNodeInputData], + main: [inputData], }, - connectionInputData: codeNodeInputData, node: codeNode, runExecutionData: { startData: {}, @@ -95,7 +95,7 @@ export const newCodeTaskData = ( newTaskData({ source: [], data: { - main: [codeNodeInputData], + main: [inputData], }, }), ], @@ -137,14 +137,13 @@ export const newCodeTaskData = ( var: 'value', }, }, - executeData: { - node: codeNode, - data: { - main: [codeNodeInputData], - }, - source: { - main: [{ previousNode: manualTriggerNode.name }], - }, + connectionInputSource: { + main: [ + { + previousNode: 'Trigger', + previousNodeOutput: 0, + }, + ], }, ...opts, }; diff --git a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts index 399d9e6e2b..366a9188de 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts @@ -1,8 +1,13 @@ import { getAdditionalKeys } from 'n8n-core'; -import type { IDataObject, INodeType, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; +import type { + IDataObject, + IExecuteData, + INodeType, + IWorkflowExecuteAdditionalData, +} from 'n8n-workflow'; import { Workflow, WorkflowDataProxy } from 'n8n-workflow'; -import { newCodeTaskData } from '../../__tests__/test-data'; +import { newDataRequestResponse } from '../../__tests__/test-data'; import { BuiltInsParser } from '../built-ins-parser'; import { BuiltInsParserState } from '../built-ins-parser-state'; @@ -159,7 +164,12 @@ describe('BuiltInsParser', () => { describe('WorkflowDataProxy built-ins', () => { it('should have a known list of built-ins', () => { - const data = newCodeTaskData([]); + const data = newDataRequestResponse([]); + const executeData: IExecuteData = { + data: {}, + node: data.node, + source: data.connectionInputSource, + }; const dataProxy = new WorkflowDataProxy( new Workflow({ ...data.workflow, @@ -179,7 +189,7 @@ describe('BuiltInsParser', () => { data.runIndex, 0, data.activeNodeName, - data.connectionInputData, + [], data.siblingParameters, data.mode, getAdditionalKeys( @@ -187,7 +197,7 @@ describe('BuiltInsParser', () => { data.mode, data.runExecutionData, ), - data.executeData, + executeData, data.defaultReturnRunIndex, data.selfData, data.contextNodeName, diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index 7e79bba73b..c64d58636b 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -1,28 +1,25 @@ import { getAdditionalKeys } from 'n8n-core'; -import { - WorkflowDataProxy, - // type IWorkflowDataProxyAdditionalKeys, - Workflow, -} from 'n8n-workflow'; +import { WorkflowDataProxy, Workflow } from 'n8n-workflow'; import type { CodeExecutionMode, - INode, - ITaskDataConnections, IWorkflowExecuteAdditionalData, - WorkflowParameters, IDataObject, - IExecuteData, INodeExecutionData, INodeParameters, - IRunExecutionData, WorkflowExecuteMode, + WorkflowParameters, + ITaskDataConnections, + INode, + IRunExecutionData, EnvProviderState, + IExecuteData, INodeTypeDescription, } from 'n8n-workflow'; import * as a from 'node:assert'; import { runInNewContext, type Context } from 'node:vm'; -import type { TaskResultData } from '@/runner-types'; +import type { MainConfig } from '@/config/main-config'; +import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types'; import { type Task, TaskRunner } from '@/task-runner'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; @@ -33,7 +30,7 @@ import { makeSerializable } from './errors/serializable-error'; import type { RequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver'; import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; -import type { MainConfig } from '../config/main-config'; +import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct'; export interface JSExecSettings { code: string; @@ -45,34 +42,19 @@ export interface JSExecSettings { mode: WorkflowExecuteMode; } -export interface PartialAdditionalData { - executionId?: string; - restartExecutionId?: string; - restApiUrl: string; - instanceBaseUrl: string; - formWaitingBaseUrl: string; - webhookBaseUrl: string; - webhookWaitingBaseUrl: string; - webhookTestBaseUrl: string; - currentNodeParameters?: INodeParameters; - executionTimeoutTimestamp?: number; - userId?: string; - variables: IDataObject; -} - -export interface DataRequestResponse { +export interface JsTaskData { workflow: Omit; inputData: ITaskDataConnections; + connectionInputData: INodeExecutionData[]; node: INode; runExecutionData: IRunExecutionData; runIndex: number; itemIndex: number; activeNodeName: string; - connectionInputData: INodeExecutionData[]; siblingParameters: INodeParameters; mode: WorkflowExecuteMode; - envProviderState?: EnvProviderState; + envProviderState: EnvProviderState; executeData?: IExecuteData; defaultReturnRunIndex: number; selfData: IDataObject; @@ -89,6 +71,8 @@ export class JsTaskRunner extends TaskRunner { private readonly builtInsParser = new BuiltInsParser(); + private readonly taskDataReconstruct = new DataRequestResponseReconstruct(); + constructor(config: MainConfig, name = 'JS Task Runner') { super({ taskType: 'javascript', @@ -115,33 +99,14 @@ export class JsTaskRunner extends TaskRunner { ? neededBuiltInsResult.result : BuiltInsParserState.newNeedsAllDataState(); - const data = await this.requestData( + const dataResponse = await this.requestData( task.taskId, neededBuiltIns.toDataRequestParams(), ); - /** - * We request node types only when we know a task needs all nodes, because - * needing all nodes means that the task relies on paired item functionality, - * which is the same requirement for needing node types. - */ - if (neededBuiltIns.needsAllNodes) { - const uniqueNodeTypes = new Map( - data.workflow.nodes.map((node) => [ - `${node.type}|${node.typeVersion}`, - { name: node.type, version: node.typeVersion }, - ]), - ); + const data = this.reconstructTaskData(dataResponse); - const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); - - const nodeTypes = await this.requestNodeTypes( - task.taskId, - unknownNodeTypes, - ); - - this.nodeTypes.addNodeTypeDescriptions(nodeTypes); - } + await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); const workflowParams = data.workflow; const workflow = new Workflow({ @@ -201,7 +166,7 @@ export class JsTaskRunner extends TaskRunner { private async runForAllItems( taskId: string, settings: JSExecSettings, - data: DataRequestResponse, + data: JsTaskData, workflow: Workflow, customConsole: CustomConsole, ): Promise { @@ -248,7 +213,7 @@ export class JsTaskRunner extends TaskRunner { private async runForEachItem( taskId: string, settings: JSExecSettings, - data: DataRequestResponse, + data: JsTaskData, workflow: Workflow, customConsole: CustomConsole, ): Promise { @@ -315,7 +280,7 @@ export class JsTaskRunner extends TaskRunner { return returnData; } - private createDataProxy(data: DataRequestResponse, workflow: Workflow, itemIndex: number) { + private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) { return new WorkflowDataProxy( workflow, data.runExecutionData, @@ -359,4 +324,43 @@ export class JsTaskRunner extends TaskRunner { return new ExecutionError({ message: JSON.stringify(error) }); } + + private reconstructTaskData(response: DataRequestResponse): JsTaskData { + return { + ...response, + connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData( + response.inputData, + ), + executeData: this.taskDataReconstruct.reconstructExecuteData(response), + }; + } + + private async requestNodeTypeIfNeeded( + neededBuiltIns: BuiltInsParserState, + workflow: JsTaskData['workflow'], + taskId: string, + ) { + /** + * We request node types only when we know a task needs all nodes, because + * needing all nodes means that the task relies on paired item functionality, + * which is the same requirement for needing node types. + */ + if (neededBuiltIns.needsAllNodes) { + const uniqueNodeTypes = new Map( + workflow.nodes.map((node) => [ + `${node.type}|${node.typeVersion}`, + { name: node.type, version: node.typeVersion }, + ]), + ); + + const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); + + const nodeTypes = await this.requestNodeTypes( + taskId, + unknownNodeTypes, + ); + + this.nodeTypes.addNodeTypeDescriptions(nodeTypes); + } + } } diff --git a/packages/@n8n/task-runner/src/runner-types.ts b/packages/@n8n/task-runner/src/runner-types.ts index 836c42ed49..4649c2cc2f 100644 --- a/packages/@n8n/task-runner/src/runner-types.ts +++ b/packages/@n8n/task-runner/src/runner-types.ts @@ -8,6 +8,7 @@ import type { INodeParameters, IRunExecutionData, ITaskDataConnections, + ITaskDataConnectionsSource, IWorkflowExecuteAdditionalData, Workflow, WorkflowExecuteMode, @@ -29,17 +30,16 @@ export interface TaskDataRequestParams { export interface DataRequestResponse { workflow: Omit; inputData: ITaskDataConnections; + connectionInputSource: ITaskDataConnectionsSource | null; node: INode; runExecutionData: IRunExecutionData; runIndex: number; itemIndex: number; activeNodeName: string; - connectionInputData: INodeExecutionData[]; siblingParameters: INodeParameters; mode: WorkflowExecuteMode; envProviderState: EnvProviderState; - executeData?: IExecuteData; defaultReturnRunIndex: number; selfData: IDataObject; contextNodeName: string; diff --git a/packages/cli/src/runners/runner-types.ts b/packages/cli/src/runners/runner-types.ts index 8fcfe968d3..b373d3051e 100644 --- a/packages/cli/src/runners/runner-types.ts +++ b/packages/cli/src/runners/runner-types.ts @@ -5,18 +5,6 @@ import type WebSocket from 'ws'; import type { TaskRunner } from './task-broker.service'; import type { AuthlessRequest } from '../requests'; -/** - * Specifies what data should be included for a task data request. - */ -export interface TaskDataRequestParams { - dataOfNodes: string[] | 'all'; - prevNode: boolean; - /** Whether input data for the node should be included */ - input: boolean; - /** Whether env provider's state should be included */ - env: boolean; -} - export interface DisconnectAnalyzer { determineDisconnectReason(runnerId: TaskRunner['id']): Promise; } diff --git a/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts b/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts index ea1492f64e..b8868983ed 100644 --- a/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts +++ b/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts @@ -1,42 +1,10 @@ -import type { TaskData } from '@n8n/task-runner'; +import type { PartialAdditionalData, TaskData } from '@n8n/task-runner'; import { mock } from 'jest-mock-extended'; -import type { IExecuteFunctions, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; -import { type INode, type INodeExecutionData, type Workflow } from 'n8n-workflow'; +import type { Workflow } from 'n8n-workflow'; import { DataRequestResponseBuilder } from '../data-request-response-builder'; -const triggerNode: INode = mock({ - name: 'Trigger', -}); -const debugHelperNode: INode = mock({ - name: 'DebugHelper', -}); -const codeNode: INode = mock({ - name: 'Code', -}); -const workflow: TaskData['workflow'] = mock(); -const debugHelperNodeOutItems: INodeExecutionData[] = [ - { - json: { - uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032', - email: 'Dan.Schmidt31@yahoo.com', - firstname: 'Toni', - lastname: 'Schuster', - password: 'Q!D6C2', - }, - pairedItem: { - item: 0, - }, - }, -]; -const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems; -const connectionInputData: TaskData['connectionInputData'] = codeNodeInputItems; -const envProviderState: TaskData['envProviderState'] = mock({ - env: {}, - isEnvAccessBlocked: false, - isProcessAvailable: true, -}); -const additionalData = mock({ +const additionalData = mock({ formWaitingBaseUrl: 'http://localhost:5678/form-waiting', instanceBaseUrl: 'http://localhost:5678/', restApiUrl: 'http://localhost:5678/rest', @@ -50,275 +18,57 @@ const additionalData = mock({ executionTimeoutTimestamp: undefined, restartExecutionId: undefined, }); -const executeFunctions = mock(); -/** - * Drawn with https://asciiflow.com/#/ - * Task data for an execution of the following WF: - * where ►► denotes the currently being executing node. - * ►► - * ┌───────────┐ ┌─────────────┐ ┌────────┐ - * │ Trigger ├──►│ DebugHelper ├───►│ Code │ - * └───────────┘ └─────────────┘ └────────┘ - */ -const taskData: TaskData = { - executeFunctions, - workflow, - connectionInputData, - inputData: { - main: [codeNodeInputItems], - }, - itemIndex: 0, - activeNodeName: codeNode.name, - contextNodeName: codeNode.name, - defaultReturnRunIndex: -1, - mode: 'manual', - envProviderState, - node: codeNode, - runExecutionData: { - startData: { - destinationNode: codeNode.name, - runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name], - }, - resultData: { - runData: { - [triggerNode.name]: [ - { - hints: [], - startTime: 1730313407328, - executionTime: 1, - source: [], - executionStatus: 'success', - data: { - main: [[]], - }, - }, - ], - [debugHelperNode.name]: [ - { - hints: [], - startTime: 1730313407330, - executionTime: 1, - source: [ - { - previousNode: triggerNode.name, - }, - ], - executionStatus: 'success', - data: { - main: [debugHelperNodeOutItems], - }, - }, - ], - }, - pinData: {}, - }, - executionData: { - contextData: {}, - nodeExecutionStack: [], - metadata: {}, - waitingExecution: { - [codeNode.name]: { - '0': { - main: [codeNodeInputItems], - }, - }, - }, - waitingExecutionSource: { - [codeNode.name]: { - '0': { - main: [ - { - previousNode: debugHelperNode.name, - }, - ], - }, - }, - }, - }, - }, - runIndex: 0, - selfData: {}, - siblingParameters: {}, - executeData: { - node: codeNode, - data: { - main: [codeNodeInputItems], - }, - source: { - main: [ - { - previousNode: debugHelperNode.name, - previousNodeOutput: 0, - }, - ], - }, - }, +const workflow: TaskData['workflow'] = mock({ + id: '1', + name: 'Test Workflow', + active: true, + connectionsBySourceNode: {}, + nodes: {}, + pinData: {}, + settings: {}, + staticData: {}, +}); + +const taskData = mock({ additionalData, -} as const; + workflow, +}); describe('DataRequestResponseBuilder', () => { - const allDataParam: DataRequestResponseBuilder['requestParams'] = { - dataOfNodes: 'all', - env: true, - input: true, - prevNode: true, - }; + const builder = new DataRequestResponseBuilder(); - const newRequestParam = (opts: Partial) => ({ - ...allDataParam, - ...opts, - }); + it('picks only specific properties for additional data', () => { + const result = builder.buildFromTaskData(taskData); - describe('all data', () => { - it('should build the runExecutionData as is when everything is requested', () => { - const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam); - - const { runExecutionData } = dataRequestResponseBuilder.build(); - - expect(runExecutionData).toStrictEqual(taskData.runExecutionData); + expect(result.additionalData).toStrictEqual({ + formWaitingBaseUrl: 'http://localhost:5678/form-waiting', + instanceBaseUrl: 'http://localhost:5678/', + restApiUrl: 'http://localhost:5678/rest', + variables: additionalData.variables, + webhookBaseUrl: 'http://localhost:5678/webhook', + webhookTestBaseUrl: 'http://localhost:5678/webhook-test', + webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', + executionId: '45844', + userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5', + currentNodeParameters: undefined, + executionTimeoutTimestamp: undefined, + restartExecutionId: undefined, }); }); - describe('envProviderState', () => { - it("should filter out envProviderState when it's not requested", () => { - const dataRequestResponseBuilder = new DataRequestResponseBuilder( - taskData, - newRequestParam({ - env: false, - }), - ); + it('picks only specific properties for workflow', () => { + const result = builder.buildFromTaskData(taskData); - const result = dataRequestResponseBuilder.build(); - - expect(result.envProviderState).toStrictEqual({ - env: {}, - isEnvAccessBlocked: false, - isProcessAvailable: true, - }); - }); - }); - - describe('additionalData', () => { - it('picks only specific properties for additional data', () => { - const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam); - - const result = dataRequestResponseBuilder.build(); - - expect(result.additionalData).toStrictEqual({ - formWaitingBaseUrl: 'http://localhost:5678/form-waiting', - instanceBaseUrl: 'http://localhost:5678/', - restApiUrl: 'http://localhost:5678/rest', - webhookBaseUrl: 'http://localhost:5678/webhook', - webhookTestBaseUrl: 'http://localhost:5678/webhook-test', - webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', - executionId: '45844', - userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5', - currentNodeParameters: undefined, - executionTimeoutTimestamp: undefined, - restartExecutionId: undefined, - variables: additionalData.variables, - }); - }); - }); - - describe('input data', () => { - const allExceptInputParam = newRequestParam({ - input: false, - }); - - it('drops input data from executeData', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.executeData).toStrictEqual({ - node: taskData.executeData!.node, - source: taskData.executeData!.source, - data: {}, - }); - }); - - it('drops input data from result', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.inputData).toStrictEqual({}); - }); - - it('drops input data from result', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.inputData).toStrictEqual({}); - }); - - it('drops input data from connectionInputData', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.connectionInputData).toStrictEqual([]); - }); - }); - - describe('nodes', () => { - it('should return empty run data when only Code node is requested', () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: ['Code'], prevNode: false }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({}); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); - }); - - it('should return empty run data when only Code node is requested', () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({}); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); - }); - - it("should return only DebugHelper's data when only DebugHelper node is requested", () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({ - [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], - }); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); - }); - - it("should return DebugHelper's data when only prevNode node is requested", () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: [], prevNode: true }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({ - [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], - }); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); + expect(result.workflow).toStrictEqual({ + id: '1', + name: 'Test Workflow', + active: true, + connections: workflow.connectionsBySourceNode, + nodes: [], + pinData: workflow.pinData, + settings: workflow.settings, + staticData: workflow.staticData, }); }); }); diff --git a/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts b/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts new file mode 100644 index 0000000000..a37b9bdc7a --- /dev/null +++ b/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts @@ -0,0 +1,300 @@ +import type { DataRequestResponse, TaskDataRequestParams } from '@n8n/task-runner'; +import { mock } from 'jest-mock-extended'; +import type { IWorkflowExecuteAdditionalData } from 'n8n-workflow'; +import { type INode, type INodeExecutionData } from 'n8n-workflow'; + +import { DataRequestResponseStripper } from '../data-request-response-stripper'; + +const triggerNode: INode = mock({ + name: 'Trigger', +}); +const debugHelperNode: INode = mock({ + name: 'DebugHelper', +}); +const codeNode: INode = mock({ + name: 'Code', +}); +const workflow: DataRequestResponse['workflow'] = mock(); +const debugHelperNodeOutItems: INodeExecutionData[] = [ + { + json: { + uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032', + email: 'Dan.Schmidt31@yahoo.com', + firstname: 'Toni', + lastname: 'Schuster', + password: 'Q!D6C2', + }, + pairedItem: { + item: 0, + }, + }, +]; +const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems; +const envProviderState: DataRequestResponse['envProviderState'] = mock< + DataRequestResponse['envProviderState'] +>({ + env: {}, + isEnvAccessBlocked: false, + isProcessAvailable: true, +}); +const additionalData = mock({ + formWaitingBaseUrl: 'http://localhost:5678/form-waiting', + instanceBaseUrl: 'http://localhost:5678/', + restApiUrl: 'http://localhost:5678/rest', + variables: {}, + webhookBaseUrl: 'http://localhost:5678/webhook', + webhookTestBaseUrl: 'http://localhost:5678/webhook-test', + webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', + executionId: '45844', + userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5', + currentNodeParameters: undefined, + executionTimeoutTimestamp: undefined, + restartExecutionId: undefined, +}); + +/** + * Drawn with https://asciiflow.com/#/ + * Task data for an execution of the following WF: + * where ►► denotes the currently being executing node. + * ►► + * ┌───────────┐ ┌─────────────┐ ┌────────┐ + * │ Trigger ├──►│ DebugHelper ├───►│ Code │ + * └───────────┘ └─────────────┘ └────────┘ + */ +const taskData: DataRequestResponse = { + workflow, + inputData: { + main: [codeNodeInputItems], + }, + itemIndex: 0, + activeNodeName: codeNode.name, + contextNodeName: codeNode.name, + defaultReturnRunIndex: -1, + mode: 'manual', + envProviderState, + node: codeNode, + runExecutionData: { + startData: { + destinationNode: codeNode.name, + runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name], + }, + resultData: { + runData: { + [triggerNode.name]: [ + { + hints: [], + startTime: 1730313407328, + executionTime: 1, + source: [], + executionStatus: 'success', + data: { + main: [[]], + }, + }, + ], + [debugHelperNode.name]: [ + { + hints: [], + startTime: 1730313407330, + executionTime: 1, + source: [ + { + previousNode: triggerNode.name, + }, + ], + executionStatus: 'success', + data: { + main: [debugHelperNodeOutItems], + }, + }, + ], + }, + pinData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: { + [codeNode.name]: { + '0': { + main: [codeNodeInputItems], + }, + }, + }, + waitingExecutionSource: { + [codeNode.name]: { + '0': { + main: [ + { + previousNode: debugHelperNode.name, + }, + ], + }, + }, + }, + }, + }, + runIndex: 0, + selfData: {}, + siblingParameters: {}, + connectionInputSource: { + main: [ + { + previousNode: debugHelperNode.name, + previousNodeOutput: 0, + }, + ], + }, + additionalData, +} as const; + +describe('DataRequestResponseStripper', () => { + const allDataParam: TaskDataRequestParams = { + dataOfNodes: 'all', + env: true, + input: true, + prevNode: true, + }; + + const newRequestParam = (opts: Partial) => ({ + ...allDataParam, + ...opts, + }); + + describe('all data', () => { + it('should build the runExecutionData as is when everything is requested', () => { + const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam); + + const { runExecutionData } = dataRequestResponseBuilder.strip(); + + expect(runExecutionData).toStrictEqual(taskData.runExecutionData); + }); + }); + + describe('envProviderState', () => { + it("should filter out envProviderState when it's not requested", () => { + const dataRequestResponseBuilder = new DataRequestResponseStripper( + taskData, + newRequestParam({ + env: false, + }), + ); + + const result = dataRequestResponseBuilder.strip(); + + expect(result.envProviderState).toStrictEqual({ + env: {}, + isEnvAccessBlocked: false, + isProcessAvailable: true, + }); + }); + }); + + describe('input data', () => { + const allExceptInputParam = newRequestParam({ + input: false, + }); + + it('drops input data from result', () => { + const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip(); + + expect(result.inputData).toStrictEqual({}); + }); + + it('drops input data from result', () => { + const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip(); + + expect(result.inputData).toStrictEqual({}); + }); + }); + + describe('nodes', () => { + it('should return empty run data when only Code node is requested', () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: ['Code'], prevNode: false }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({}); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + + it('should return empty run data when only Code node is requested', () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({}); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + + it("should return only DebugHelper's data when only DebugHelper node is requested", () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({ + [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], + }); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + + it("should return DebugHelper's data when only prevNode node is requested", () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: [], prevNode: true }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({ + [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], + }); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + }); + + describe('passthrough properties', () => { + test.each>([ + ['workflow'], + ['connectionInputSource'], + ['node'], + ['runIndex'], + ['itemIndex'], + ['activeNodeName'], + ['siblingParameters'], + ['mode'], + ['defaultReturnRunIndex'], + ['selfData'], + ['contextNodeName'], + ['additionalData'], + ])("it doesn't change %s", (propertyName) => { + const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam); + + const result = dataRequestResponseBuilder.strip(); + + expect(result[propertyName]).toBe(taskData[propertyName]); + }); + }); +}); diff --git a/packages/cli/src/runners/task-managers/data-request-response-builder.ts b/packages/cli/src/runners/task-managers/data-request-response-builder.ts index bc498c33a7..7df3b9e012 100644 --- a/packages/cli/src/runners/task-managers/data-request-response-builder.ts +++ b/packages/cli/src/runners/task-managers/data-request-response-builder.ts @@ -1,63 +1,30 @@ -import type { - DataRequestResponse, - BrokerMessage, - PartialAdditionalData, - TaskData, -} from '@n8n/task-runner'; -import type { - EnvProviderState, - IExecuteData, - INodeExecutionData, - IPinData, - IRunData, - IRunExecutionData, - ITaskDataConnections, - IWorkflowExecuteAdditionalData, - Workflow, - WorkflowParameters, -} from 'n8n-workflow'; +import type { DataRequestResponse, PartialAdditionalData, TaskData } from '@n8n/task-runner'; +import type { IWorkflowExecuteAdditionalData, Workflow, WorkflowParameters } from 'n8n-workflow'; /** - * Builds the response to a data request coming from a Task Runner. Tries to minimize - * the amount of data that is sent to the runner by only providing what is requested. + * Transforms TaskData to DataRequestResponse. The main purpose of the + * transformation is to make sure there is no duplication in the data + * (e.g. connectionInputData and executeData.data can be derived from + * inputData). */ export class DataRequestResponseBuilder { - private requestedNodeNames = new Set(); - - constructor( - private readonly taskData: TaskData, - private readonly requestParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'], - ) { - this.requestedNodeNames = new Set(requestParams.dataOfNodes); - - if (this.requestParams.prevNode && this.requestParams.dataOfNodes !== 'all') { - this.requestedNodeNames.add(this.determinePrevNodeName()); - } - } - - /** - * Builds a response to the data request - */ - build(): DataRequestResponse { - const { taskData: td } = this; - + buildFromTaskData(taskData: TaskData): DataRequestResponse { return { - workflow: this.buildWorkflow(td.workflow), - connectionInputData: this.buildConnectionInputData(td.connectionInputData), - inputData: this.buildInputData(td.inputData), - itemIndex: td.itemIndex, - activeNodeName: td.activeNodeName, - contextNodeName: td.contextNodeName, - defaultReturnRunIndex: td.defaultReturnRunIndex, - mode: td.mode, - envProviderState: this.buildEnvProviderState(td.envProviderState), - node: td.node, // The current node being executed - runExecutionData: this.buildRunExecutionData(td.runExecutionData), - runIndex: td.runIndex, - selfData: td.selfData, - siblingParameters: td.siblingParameters, - executeData: this.buildExecuteData(td.executeData), - additionalData: this.buildAdditionalData(td.additionalData), + workflow: this.buildWorkflow(taskData.workflow), + inputData: taskData.inputData, + connectionInputSource: taskData.executeData?.source ?? null, + itemIndex: taskData.itemIndex, + activeNodeName: taskData.activeNodeName, + contextNodeName: taskData.contextNodeName, + defaultReturnRunIndex: taskData.defaultReturnRunIndex, + mode: taskData.mode, + envProviderState: taskData.envProviderState, + node: taskData.node, + runExecutionData: taskData.runExecutionData, + runIndex: taskData.runIndex, + selfData: taskData.selfData, + siblingParameters: taskData.siblingParameters, + additionalData: this.buildAdditionalData(taskData.additionalData), }; } @@ -80,86 +47,6 @@ export class DataRequestResponseBuilder { }; } - private buildExecuteData(executeData: IExecuteData | undefined): IExecuteData | undefined { - if (executeData === undefined) { - return undefined; - } - - return { - node: executeData.node, // The current node being executed - data: this.requestParams.input ? executeData.data : {}, - source: executeData.source, - }; - } - - private buildRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData { - if (this.requestParams.dataOfNodes === 'all') { - return runExecutionData; - } - - return { - startData: runExecutionData.startData, - resultData: { - error: runExecutionData.resultData.error, - lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted, - metadata: runExecutionData.resultData.metadata, - runData: this.buildRunData(runExecutionData.resultData.runData), - pinData: this.buildPinData(runExecutionData.resultData.pinData), - }, - executionData: runExecutionData.executionData - ? { - // TODO: Figure out what these two are and can they be filtered - contextData: runExecutionData.executionData?.contextData, - nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack, - - metadata: runExecutionData.executionData.metadata, - waitingExecution: runExecutionData.executionData.waitingExecution, - waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource, - } - : undefined, - }; - } - - private buildRunData(runData: IRunData): IRunData { - return this.filterObjectByNodeNames(runData); - } - - private buildPinData(pinData: IPinData | undefined): IPinData | undefined { - return pinData ? this.filterObjectByNodeNames(pinData) : undefined; - } - - private buildEnvProviderState(envProviderState: EnvProviderState): EnvProviderState { - if (this.requestParams.env) { - // In case `isEnvAccessBlocked` = true, the provider state has already sanitized - // the environment variables and we can return it as is. - return envProviderState; - } - - return { - env: {}, - isEnvAccessBlocked: envProviderState.isEnvAccessBlocked, - isProcessAvailable: envProviderState.isProcessAvailable, - }; - } - - private buildInputData(inputData: ITaskDataConnections): ITaskDataConnections { - if (this.requestParams.input) { - return inputData; - } - - return {}; - } - - private buildConnectionInputData( - connectionInputData: INodeExecutionData[], - ): INodeExecutionData[] { - if (this.requestParams.input) { - return connectionInputData; - } - - return []; - } - private buildWorkflow(workflow: Workflow): Omit { return { id: workflow.id, @@ -172,37 +59,4 @@ export class DataRequestResponseBuilder { staticData: workflow.staticData, }; } - - /** - * Assuming the given `obj` is an object where the keys are node names, - * filters the object to only include the node names that are requested. - */ - private filterObjectByNodeNames>(obj: T): T { - if (this.requestParams.dataOfNodes === 'all') { - return obj; - } - - const filteredObj: T = {} as T; - - for (const nodeName in obj) { - if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) { - continue; - } - - if (this.requestedNodeNames.has(nodeName)) { - filteredObj[nodeName] = obj[nodeName]; - } - } - - return filteredObj; - } - - private determinePrevNodeName(): string { - const sourceData = this.taskData.executeData?.source?.main?.[0]; - if (!sourceData) { - return ''; - } - - return sourceData.previousNode; - } } diff --git a/packages/cli/src/runners/task-managers/data-request-response-stripper.ts b/packages/cli/src/runners/task-managers/data-request-response-stripper.ts new file mode 100644 index 0000000000..b924a87c5f --- /dev/null +++ b/packages/cli/src/runners/task-managers/data-request-response-stripper.ts @@ -0,0 +1,131 @@ +import type { DataRequestResponse, BrokerMessage } from '@n8n/task-runner'; +import type { + EnvProviderState, + IPinData, + IRunData, + IRunExecutionData, + ITaskDataConnections, +} from 'n8n-workflow'; + +/** + * Strips data from data request response based on the specified parameters + */ +export class DataRequestResponseStripper { + private requestedNodeNames = new Set(); + + constructor( + private readonly dataResponse: DataRequestResponse, + private readonly stripParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'], + ) { + this.requestedNodeNames = new Set(stripParams.dataOfNodes); + + if (this.stripParams.prevNode && this.stripParams.dataOfNodes !== 'all') { + this.requestedNodeNames.add(this.determinePrevNodeName()); + } + } + + /** + * Builds a response to the data request + */ + strip(): DataRequestResponse { + const { dataResponse: dr } = this; + + return { + ...dr, + inputData: this.stripInputData(dr.inputData), + envProviderState: this.stripEnvProviderState(dr.envProviderState), + runExecutionData: this.stripRunExecutionData(dr.runExecutionData), + }; + } + + private stripRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData { + if (this.stripParams.dataOfNodes === 'all') { + return runExecutionData; + } + + return { + startData: runExecutionData.startData, + resultData: { + error: runExecutionData.resultData.error, + lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted, + metadata: runExecutionData.resultData.metadata, + runData: this.stripRunData(runExecutionData.resultData.runData), + pinData: this.stripPinData(runExecutionData.resultData.pinData), + }, + executionData: runExecutionData.executionData + ? { + // TODO: Figure out what these two are and can they be stripped + contextData: runExecutionData.executionData?.contextData, + nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack, + + metadata: runExecutionData.executionData.metadata, + waitingExecution: runExecutionData.executionData.waitingExecution, + waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource, + } + : undefined, + }; + } + + private stripRunData(runData: IRunData): IRunData { + return this.filterObjectByNodeNames(runData); + } + + private stripPinData(pinData: IPinData | undefined): IPinData | undefined { + return pinData ? this.filterObjectByNodeNames(pinData) : undefined; + } + + private stripEnvProviderState(envProviderState: EnvProviderState): EnvProviderState { + if (this.stripParams.env) { + // In case `isEnvAccessBlocked` = true, the provider state has already sanitized + // the environment variables and we can return it as is. + return envProviderState; + } + + return { + env: {}, + isEnvAccessBlocked: envProviderState.isEnvAccessBlocked, + isProcessAvailable: envProviderState.isProcessAvailable, + }; + } + + private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections { + if (this.stripParams.input) { + return inputData; + } + + return {}; + } + + /** + * Assuming the given `obj` is an object where the keys are node names, + * filters the object to only include the node names that are requested. + */ + private filterObjectByNodeNames>(obj: T): T { + if (this.stripParams.dataOfNodes === 'all') { + return obj; + } + + const filteredObj: T = {} as T; + + for (const nodeName in obj) { + if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) { + continue; + } + + if (this.requestedNodeNames.has(nodeName)) { + filteredObj[nodeName] = obj[nodeName]; + } + } + + return filteredObj; + } + + private determinePrevNodeName(): string { + const sourceData = this.dataResponse.connectionInputSource?.main?.[0]; + if (!sourceData) { + return ''; + } + + return sourceData.previousNode; + } +} diff --git a/packages/cli/src/runners/task-managers/task-manager.ts b/packages/cli/src/runners/task-managers/task-manager.ts index 21d4e8eb9d..ffc86cdf62 100644 --- a/packages/cli/src/runners/task-managers/task-manager.ts +++ b/packages/cli/src/runners/task-managers/task-manager.ts @@ -1,5 +1,6 @@ +import { TaskRunnersConfig } from '@n8n/config'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; -import { RPC_ALLOW_LIST } from '@n8n/task-runner'; +import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner'; import type { EnvProviderState, IExecuteFunctions, @@ -17,11 +18,13 @@ import type { } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; -import { Service } from 'typedi'; +import * as a from 'node:assert/strict'; +import Container, { Service } from 'typedi'; import { NodeTypes } from '@/node-types'; import { DataRequestResponseBuilder } from './data-request-response-builder'; +import { DataRequestResponseStripper } from './data-request-response-stripper'; export type RequestAccept = (jobId: string) => void; export type RequestReject = (reason: string) => void; @@ -56,6 +59,10 @@ export abstract class TaskManager { tasks: Map = new Map(); + private readonly runnerConfig = Container.get(TaskRunnersConfig); + + private readonly dataResponseBuilder = new DataRequestResponseBuilder(); + constructor(private readonly nodeTypes: NodeTypes) {} async startTask( @@ -237,14 +244,30 @@ export abstract class TaskManager { return; } - const dataRequestResponseBuilder = new DataRequestResponseBuilder(job.data, requestParams); - const requestedData = dataRequestResponseBuilder.build(); + const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data); + + if (this.runnerConfig.assertDeduplicationOutput) { + const reconstruct = new DataRequestResponseReconstruct(); + a.deepStrictEqual( + reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData), + job.data.connectionInputData, + ); + a.deepStrictEqual( + reconstruct.reconstructExecuteData(dataRequestResponse), + job.data.executeData, + ); + } + + const strippedData = new DataRequestResponseStripper( + dataRequestResponse, + requestParams, + ).strip(); this.sendMessage({ type: 'requester:taskdataresponse', taskId, requestId, - data: requestedData, + data: strippedData, }); }