diff --git a/packages/core/src/CreateNodeAsTool.ts b/packages/core/src/CreateNodeAsTool.ts index 1466f57d09..c569f943ce 100644 --- a/packages/core/src/CreateNodeAsTool.ts +++ b/packages/core/src/CreateNodeAsTool.ts @@ -1,5 +1,10 @@ import { DynamicStructuredTool } from '@langchain/core/tools'; -import type { IExecuteFunctions, INodeParameters, INodeType } from 'n8n-workflow'; +import type { + IExecuteFunctions, + INodeParameters, + INodeType, + ISupplyDataFunctions, +} from 'n8n-workflow'; import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { z } from 'zod'; @@ -18,13 +23,13 @@ interface FromAIArgument { * generating Zod schemas, and creating LangChain tools. */ class AIParametersParser { - private ctx: IExecuteFunctions; + private ctx: ISupplyDataFunctions; /** * Constructs an instance of AIParametersParser. * @param ctx The execution context. */ - constructor(ctx: IExecuteFunctions) { + constructor(ctx: ISupplyDataFunctions) { this.ctx = ctx; } @@ -388,7 +393,7 @@ class AIParametersParser { try { // Execute the node with the proxied context - const result = await node.execute?.bind(this.ctx)(); + const result = await node.execute?.bind(this.ctx as IExecuteFunctions)(); // Process and map the results const mappedResults = result?.[0]?.flatMap((item) => item.json); @@ -423,7 +428,7 @@ class AIParametersParser { * @returns An object containing the DynamicStructuredTool instance. */ export function createNodeAsTool( - ctx: IExecuteFunctions, + ctx: ISupplyDataFunctions, node: INodeType, nodeParameters: INodeParameters, ) { diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 29f60b8979..807daab46b 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -173,6 +173,7 @@ import { ExecuteSingleContext, HookContext, PollContext, + SupplyDataContext, TriggerContext, WebhookContext, } from './node-execution-context'; @@ -2714,7 +2715,7 @@ export function getWebhookDescription( } // TODO: Change options to an object -const addExecutionDataFunctions = async ( +export const addExecutionDataFunctions = async ( type: 'input' | 'output', nodeName: string, data: INodeExecutionData[][] | ExecutionBaseError, @@ -2880,25 +2881,23 @@ export async function getInputConnectionData( connectedNode.type, connectedNode.typeVersion, ); - - // eslint-disable-next-line @typescript-eslint/no-use-before-define - const context = getSupplyDataFunctions( + const context = new SupplyDataContext( workflow, + connectedNode, + additionalData, + mode, runExecutionData, runIndex, connectionInputData, inputData, - connectedNode, - additionalData, executeData, - mode, closeFunctions, abortSignal, ); if (!nodeType.supplyData) { if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) { - nodeType.supplyData = async function (this: IExecuteFunctions) { + nodeType.supplyData = async function (this: ISupplyDataFunctions) { return createNodeAsTool(this, nodeType, this.getNode().parameters); }; } else { @@ -3942,277 +3941,6 @@ export function getExecuteFunctions( })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; } -export function getSupplyDataFunctions( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - connectionInputData: INodeExecutionData[], - inputData: ITaskDataConnections, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - executeData: IExecuteData, - mode: WorkflowExecuteMode, - closeFunctions: CloseFunction[], - abortSignal?: AbortSignal, -): ISupplyDataFunctions { - return { - ...getCommonWorkflowFunctions(workflow, node, additionalData), - ...executionCancellationFunctions(abortSignal), - getMode: () => mode, - getCredentials: async (type, itemIndex) => - await getCredentials( - workflow, - node, - type, - additionalData, - mode, - executeData, - runExecutionData, - runIndex, - connectionInputData, - itemIndex, - ), - continueOnFail: () => continueOnFail(node), - evaluateExpression: (expression: string, itemIndex: number) => - workflow.expression.resolveSimpleParameterValue( - `=${expression}`, - {}, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ), - executeWorkflow: async ( - workflowInfo: IExecuteWorkflowInfo, - inputData?: INodeExecutionData[], - parentCallbackManager?: CallbackManager, - options?: { - doNotWaitToFinish?: boolean; - parentExecution?: RelatedExecution; - }, - ): Promise => - await additionalData - .executeWorkflow(workflowInfo, additionalData, { - parentWorkflowId: workflow.id?.toString(), - inputData, - parentWorkflowSettings: workflow.settings, - node, - parentCallbackManager, - ...options, - }) - .then(async (result) => { - const data = await Container.get(BinaryDataService).duplicateBinaryData( - workflow.id, - additionalData.executionId!, - result.data, - ); - return { ...result, data }; - }), - getNodeOutputs() { - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => { - if (typeof output === 'string') { - return { - type: output, - }; - } - return output; - }); - }, - async getInputConnectionData( - inputName: NodeConnectionType, - itemIndex: number, - ): Promise { - return await getInputConnectionData.call( - this, - workflow, - runExecutionData, - runIndex, - connectionInputData, - inputData, - additionalData, - executeData, - mode, - closeFunctions, - inputName, - itemIndex, - abortSignal, - ); - }, - getInputData: (inputIndex = 0, inputName = 'main') => { - if (!inputData.hasOwnProperty(inputName)) { - // Return empty array because else it would throw error when nothing is connected to input - return []; - } - - // TODO: Check if nodeType has input with that index defined - if (inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input with given index', { - extra: { inputIndex, inputName }, - }); - } - - if (inputData[inputName][inputIndex] === null) { - throw new ApplicationError('Value of input was not set', { - extra: { inputIndex, inputName }, - }); - } - - return inputData[inputName][inputIndex]; - }, - getNodeParameter: (( - parameterName: string, - itemIndex: number, - fallbackValue?: any, - options?: IGetNodeParameterOptions, - ) => - getNodeParameter( - workflow, - runExecutionData, - runIndex, - connectionInputData, - node, - parameterName, - itemIndex, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - fallbackValue, - options, - )) as ISupplyDataFunctions['getNodeParameter'], - getWorkflowDataProxy: (itemIndex: number) => - new WorkflowDataProxy( - workflow, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - {}, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ).getDataProxy(), - sendMessageToUI(...args: any[]): void { - if (mode !== 'manual') { - return; - } - try { - if (additionalData.sendDataToUI) { - args = args.map((arg) => { - // prevent invalid dates from being logged as null - if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; - - // log valid dates in human readable format, as in browser - if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); - if (arg instanceof Date) return arg.toString(); - - return arg; - }); - - additionalData.sendDataToUI('sendConsoleMessage', { - source: `[Node: "${node.name}"]`, - messages: args, - }); - } - } catch (error) { - Logger.warn(`There was a problem sending message to UI: ${error.message}`); - } - }, - logAiEvent: (eventName: AiEvent, msg: string) => - additionalData.logAiEvent(eventName, { - executionId: additionalData.executionId ?? 'unsaved-execution', - nodeName: node.name, - workflowName: workflow.name ?? 'Unnamed workflow', - nodeType: node.type, - workflowId: workflow.id ?? 'unsaved-workflow', - msg, - }), - addInputData( - connectionType: NodeConnectionType, - data: INodeExecutionData[][], - ): { index: number } { - const nodeName = this.getNode().name; - let currentNodeRunIndex = 0; - if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length; - } - - addExecutionDataFunctions( - 'input', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging input data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - - return { index: currentNodeRunIndex }; - }, - addOutputData( - connectionType: NodeConnectionType, - currentNodeRunIndex: number, - data: INodeExecutionData[][], - metadata?: ITaskMetadata, - ): void { - addExecutionDataFunctions( - 'output', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - metadata, - ).catch((error) => { - Logger.warn( - `There was a problem logging output data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - }, - helpers: { - createDeferredPromise, - copyInputItems, - ...getRequestHelperFunctions( - workflow, - node, - additionalData, - runExecutionData, - connectionInputData, - ), - ...getSSHTunnelFunctions(), - ...getFileSystemHelperFunctions(node), - ...getBinaryHelperFunctions(additionalData, workflow.id), - ...getCheckProcessedHelperFunctions(workflow, node), - assertBinaryData: (itemIndex, propertyName) => - assertBinaryData(inputData, node, itemIndex, propertyName, 0), - getBinaryDataBuffer: async (itemIndex, propertyName) => - await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), - - returnJsonArray, - normalizeItems, - constructExecutionMetaData, - }, - }; -} - /** * Returns the execute functions regular nodes have access to when single-function is defined. */ diff --git a/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts b/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts index 8a2fc72be3..bc868f6c07 100644 --- a/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts +++ b/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts @@ -302,20 +302,6 @@ describe('ExecuteSingleContext', () => { describe('setMetadata', () => { it('sets metadata on execution data', () => { - const context = new ExecuteSingleContext( - workflow, - node, - additionalData, - mode, - runExecutionData, - runIndex, - connectionInputData, - inputData, - itemIndex, - executeData, - abortSignal, - ); - const metadata: ITaskMetadata = { subExecution: { workflowId: '123', @@ -323,9 +309,11 @@ describe('ExecuteSingleContext', () => { }, }; - expect(context.getExecuteData().metadata?.subExecution).toEqual(undefined); - context.setMetadata(metadata); - expect(context.getExecuteData().metadata?.subExecution).toEqual(metadata.subExecution); + expect(executeSingleContext.getExecuteData().metadata?.subExecution).toEqual(undefined); + executeSingleContext.setMetadata(metadata); + expect(executeSingleContext.getExecuteData().metadata?.subExecution).toEqual( + metadata.subExecution, + ); }); }); }); diff --git a/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts b/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts new file mode 100644 index 0000000000..a60517e5dd --- /dev/null +++ b/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts @@ -0,0 +1,240 @@ +import { mock } from 'jest-mock-extended'; +import type { + INode, + IWorkflowExecuteAdditionalData, + IRunExecutionData, + INodeExecutionData, + ITaskDataConnections, + IExecuteData, + Workflow, + WorkflowExecuteMode, + ICredentialsHelper, + Expression, + INodeType, + INodeTypes, + OnError, + ICredentialDataDecryptedObject, +} from 'n8n-workflow'; +import { ApplicationError } from 'n8n-workflow'; + +import { SupplyDataContext } from '../supply-data-context'; + +describe('SupplyDataContext', () => { + const testCredentialType = 'testCredential'; + const nodeType = mock({ + description: { + credentials: [ + { + name: testCredentialType, + required: true, + }, + ], + properties: [ + { + name: 'testParameter', + required: true, + }, + ], + }, + }); + const nodeTypes = mock(); + const expression = mock(); + const workflow = mock({ expression, nodeTypes }); + const node = mock({ + credentials: { + [testCredentialType]: { + id: 'testCredentialId', + }, + }, + }); + node.parameters = { + testParameter: 'testValue', + }; + const credentialsHelper = mock(); + const additionalData = mock({ credentialsHelper }); + const mode: WorkflowExecuteMode = 'manual'; + const runExecutionData = mock(); + const connectionInputData = mock(); + const inputData: ITaskDataConnections = { main: [[{ json: { test: 'data' } }]] }; + const executeData = mock(); + const runIndex = 0; + const closeFn = jest.fn(); + const abortSignal = mock(); + + const supplyDataContext = new SupplyDataContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executeData, + [closeFn], + abortSignal, + ); + + beforeEach(() => { + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + expression.getParameterValue.mockImplementation((value) => value); + }); + + describe('getExecutionCancelSignal', () => { + it('should return the abort signal', () => { + expect(supplyDataContext.getExecutionCancelSignal()).toBe(abortSignal); + }); + }); + + describe('continueOnFail', () => { + afterEach(() => { + node.onError = undefined; + node.continueOnFail = false; + }); + + it('should return false for nodes by default', () => { + expect(supplyDataContext.continueOnFail()).toEqual(false); + }); + + it('should return true if node has continueOnFail set to true', () => { + node.continueOnFail = true; + expect(supplyDataContext.continueOnFail()).toEqual(true); + }); + + test.each([ + ['continueRegularOutput', true], + ['continueErrorOutput', true], + ['stopWorkflow', false], + ])('if node has onError set to %s, it should return %s', (onError, expected) => { + node.onError = onError as OnError; + expect(supplyDataContext.continueOnFail()).toEqual(expected); + }); + }); + + describe('evaluateExpression', () => { + it('should evaluate the expression correctly', () => { + const expression = '$json.test'; + const expectedResult = 'data'; + const resolveSimpleParameterValueSpy = jest.spyOn( + workflow.expression, + 'resolveSimpleParameterValue', + ); + resolveSimpleParameterValueSpy.mockReturnValue(expectedResult); + + expect(supplyDataContext.evaluateExpression(expression, 0)).toEqual(expectedResult); + + expect(resolveSimpleParameterValueSpy).toHaveBeenCalledWith( + `=${expression}`, + {}, + runExecutionData, + runIndex, + 0, + node.name, + connectionInputData, + mode, + expect.objectContaining({}), + executeData, + ); + + resolveSimpleParameterValueSpy.mockRestore(); + }); + }); + + describe('getInputData', () => { + const inputIndex = 0; + const inputName = 'main'; + + afterEach(() => { + inputData[inputName] = [[{ json: { test: 'data' } }]]; + }); + + it('should return the input data correctly', () => { + const expectedData = [{ json: { test: 'data' } }]; + + expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + }); + + it('should return an empty array if the input name does not exist', () => { + const inputName = 'nonExistent'; + expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual([]); + }); + + it('should throw an error if the input index is out of range', () => { + const inputIndex = 2; + + expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError); + }); + + it('should throw an error if the input index was not set', () => { + inputData.main[inputIndex] = null; + + expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError); + }); + }); + + describe('getNodeParameter', () => { + beforeEach(() => { + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + expression.getParameterValue.mockImplementation((value) => value); + }); + + it('should return parameter value when it exists', () => { + const parameter = supplyDataContext.getNodeParameter('testParameter', 0); + + expect(parameter).toBe('testValue'); + }); + + it('should return the fallback value when the parameter does not exist', () => { + const parameter = supplyDataContext.getNodeParameter('otherParameter', 0, 'fallback'); + + expect(parameter).toBe('fallback'); + }); + }); + + describe('getCredentials', () => { + it('should get decrypted credentials', async () => { + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + credentialsHelper.getDecrypted.mockResolvedValue({ secret: 'token' }); + + const credentials = await supplyDataContext.getCredentials( + testCredentialType, + 0, + ); + + expect(credentials).toEqual({ secret: 'token' }); + }); + }); + + describe('getWorkflowDataProxy', () => { + it('should return the workflow data proxy correctly', () => { + const workflowDataProxy = supplyDataContext.getWorkflowDataProxy(0); + expect(workflowDataProxy.isProxy).toBe(true); + expect(Object.keys(workflowDataProxy.$input)).toEqual([ + 'all', + 'context', + 'first', + 'item', + 'last', + 'params', + ]); + }); + }); + + describe('logAiEvent', () => { + it('should log the AI event correctly', () => { + const eventName = 'ai-tool-called'; + const msg = 'test message'; + + supplyDataContext.logAiEvent(eventName, msg); + + expect(additionalData.logAiEvent).toHaveBeenCalledWith(eventName, { + executionId: additionalData.executionId, + nodeName: node.name, + workflowName: workflow.name, + nodeType: node.type, + workflowId: workflow.id, + msg, + }); + }); + }); +}); diff --git a/packages/core/src/node-execution-context/index.ts b/packages/core/src/node-execution-context/index.ts index d5c663b2ab..8d281f1ed3 100644 --- a/packages/core/src/node-execution-context/index.ts +++ b/packages/core/src/node-execution-context/index.ts @@ -3,5 +3,6 @@ export { ExecuteSingleContext } from './execute-single-context'; export { HookContext } from './hook-context'; export { LoadOptionsContext } from './load-options-context'; export { PollContext } from './poll-context'; +export { SupplyDataContext } from './supply-data-context'; export { TriggerContext } from './trigger-context'; export { WebhookContext } from './webhook-context'; diff --git a/packages/core/src/node-execution-context/supply-data-context.ts b/packages/core/src/node-execution-context/supply-data-context.ts new file mode 100644 index 0000000000..de59a1c6d4 --- /dev/null +++ b/packages/core/src/node-execution-context/supply-data-context.ts @@ -0,0 +1,371 @@ +import type { + AiEvent, + CallbackManager, + CloseFunction, + ExecuteWorkflowData, + ICredentialDataDecryptedObject, + IExecuteData, + IExecuteWorkflowInfo, + IGetNodeParameterOptions, + INode, + INodeExecutionData, + IRunExecutionData, + ISupplyDataFunctions, + ITaskDataConnections, + ITaskMetadata, + IWorkflowExecuteAdditionalData, + NodeConnectionType, + RelatedExecution, + Workflow, + WorkflowExecuteMode, +} from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + NodeHelpers, + WorkflowDataProxy, +} from 'n8n-workflow'; +import Container from 'typedi'; + +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; +// eslint-disable-next-line import/no-cycle +import { + assertBinaryData, + continueOnFail, + constructExecutionMetaData, + copyInputItems, + getAdditionalKeys, + getBinaryDataBuffer, + getBinaryHelperFunctions, + getCheckProcessedHelperFunctions, + getCredentials, + getFileSystemHelperFunctions, + getNodeParameter, + getRequestHelperFunctions, + getSSHTunnelFunctions, + normalizeItems, + returnJsonArray, + getInputConnectionData, + addExecutionDataFunctions, +} from '@/NodeExecuteFunctions'; + +import { NodeExecutionContext } from './node-execution-context'; + +export class SupplyDataContext extends NodeExecutionContext implements ISupplyDataFunctions { + readonly helpers: ISupplyDataFunctions['helpers']; + + readonly getNodeParameter: ISupplyDataFunctions['getNodeParameter']; + + constructor( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + private readonly runExecutionData: IRunExecutionData, + private readonly runIndex: number, + private readonly connectionInputData: INodeExecutionData[], + private readonly inputData: ITaskDataConnections, + private readonly executeData: IExecuteData, + private readonly closeFunctions: CloseFunction[], + private readonly abortSignal?: AbortSignal, + ) { + super(workflow, node, additionalData, mode); + + this.helpers = { + createDeferredPromise, + copyInputItems, + ...getRequestHelperFunctions( + workflow, + node, + additionalData, + runExecutionData, + connectionInputData, + ), + ...getSSHTunnelFunctions(), + ...getFileSystemHelperFunctions(node), + ...getBinaryHelperFunctions(additionalData, workflow.id), + ...getCheckProcessedHelperFunctions(workflow, node), + assertBinaryData: (itemIndex, propertyName) => + assertBinaryData(inputData, node, itemIndex, propertyName, 0), + getBinaryDataBuffer: async (itemIndex, propertyName) => + await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), + + returnJsonArray, + normalizeItems, + constructExecutionMetaData, + }; + + this.getNodeParameter = (( + parameterName: string, + itemIndex: number, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + fallbackValue?: any, + options?: IGetNodeParameterOptions, + ) => + getNodeParameter( + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.node, + parameterName, + itemIndex, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + fallbackValue, + options, + )) as ISupplyDataFunctions['getNodeParameter']; + } + + getExecutionCancelSignal() { + return this.abortSignal; + } + + onExecutionCancellation(handler: () => unknown) { + const fn = () => { + this.abortSignal?.removeEventListener('abort', fn); + handler(); + }; + this.abortSignal?.addEventListener('abort', fn); + } + + async getCredentials( + type: string, + itemIndex: number, + ) { + return await getCredentials( + this.workflow, + this.node, + type, + this.additionalData, + this.mode, + this.executeData, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + itemIndex, + ); + } + + continueOnFail() { + return continueOnFail(this.node); + } + + evaluateExpression(expression: string, itemIndex: number) { + return this.workflow.expression.resolveSimpleParameterValue( + `=${expression}`, + {}, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ); + } + + async executeWorkflow( + workflowInfo: IExecuteWorkflowInfo, + inputData?: INodeExecutionData[], + parentCallbackManager?: CallbackManager, + options?: { + doNotWaitToFinish?: boolean; + parentExecution?: RelatedExecution; + }, + ): Promise { + return await this.additionalData + .executeWorkflow(workflowInfo, this.additionalData, { + parentWorkflowId: this.workflow.id?.toString(), + inputData, + parentWorkflowSettings: this.workflow.settings, + node: this.node, + parentCallbackManager, + ...options, + }) + .then(async (result) => { + const data = await Container.get(BinaryDataService).duplicateBinaryData( + this.workflow.id, + this.additionalData.executionId!, + result.data, + ); + return { ...result, data }; + }); + } + + getNodeOutputs() { + const nodeType = this.workflow.nodeTypes.getByNameAndVersion( + this.node.type, + this.node.typeVersion, + ); + return NodeHelpers.getNodeOutputs(this.workflow, this.node, nodeType.description).map( + (output) => { + if (typeof output === 'string') { + return { + type: output, + }; + } + return output; + }, + ); + } + + async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + return await getInputConnectionData.call( + this, + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.inputData, + this.additionalData, + this.executeData, + this.mode, + this.closeFunctions, + inputName, + itemIndex, + this.abortSignal, + ); + } + + getInputData(inputIndex = 0, inputName = 'main') { + if (!this.inputData.hasOwnProperty(inputName)) { + // Return empty array because else it would throw error when nothing is connected to input + return []; + } + + // TODO: Check if nodeType has input with that index defined + if (this.inputData[inputName].length < inputIndex) { + throw new ApplicationError('Could not get input with given index', { + extra: { inputIndex, inputName }, + }); + } + + if (this.inputData[inputName][inputIndex] === null) { + throw new ApplicationError('Value of input was not set', { + extra: { inputIndex, inputName }, + }); + } + + return this.inputData[inputName][inputIndex]; + } + + getWorkflowDataProxy(itemIndex: number) { + return new WorkflowDataProxy( + this.workflow, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + {}, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ).getDataProxy(); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sendMessageToUI(...args: any[]): void { + if (this.mode !== 'manual') { + return; + } + try { + if (this.additionalData.sendDataToUI) { + args = args.map((arg) => { + // prevent invalid dates from being logged as null + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-return + if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; + + // log valid dates in human readable format, as in browser + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-argument + if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); + if (arg instanceof Date) return arg.toString(); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return arg; + }); + + this.additionalData.sendDataToUI('sendConsoleMessage', { + source: `[Node: "${this.node.name}"]`, + messages: args, + }); + } + } catch (error) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + this.logger.warn(`There was a problem sending message to UI: ${error.message}`); + } + } + + logAiEvent(eventName: AiEvent, msg: string) { + return this.additionalData.logAiEvent(eventName, { + executionId: this.additionalData.executionId ?? 'unsaved-execution', + nodeName: this.node.name, + workflowName: this.workflow.name ?? 'Unnamed workflow', + nodeType: this.node.type, + workflowId: this.workflow.id ?? 'unsaved-workflow', + msg, + }); + } + + addInputData( + connectionType: NodeConnectionType, + data: INodeExecutionData[][], + ): { index: number } { + const nodeName = this.getNode().name; + let currentNodeRunIndex = 0; + if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { + currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; + } + + addExecutionDataFunctions( + 'input', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging input data of node "${this.node.name}": ${ + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + error.message + }`, + ); + }); + + return { index: currentNodeRunIndex }; + } + + addOutputData( + connectionType: NodeConnectionType, + currentNodeRunIndex: number, + data: INodeExecutionData[][], + metadata?: ITaskMetadata, + ): void { + addExecutionDataFunctions( + 'output', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + metadata, + ).catch((error) => { + this.logger.warn( + `There was a problem logging output data of node "${this.node.name}": ${ + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + error.message + }`, + ); + }); + } +}