refactor(Code Tool Node): Replace vm2 with taskrunner for js (#19247)

This commit is contained in:
yehorkardash
2025-09-09 11:15:31 +00:00
committed by GitHub
parent 04889864a0
commit a910604822
9 changed files with 182 additions and 39 deletions

View File

@@ -1,24 +1,27 @@
import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools'; import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools';
import { TaskRunnersConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import type { JSONSchema7 } from 'json-schema'; import type { JSONSchema7 } from 'json-schema';
import { JavaScriptSandbox } from 'n8n-nodes-base/dist/nodes/Code/JavaScriptSandbox'; import { JavaScriptSandbox } from 'n8n-nodes-base/dist/nodes/Code/JavaScriptSandbox';
import { JsTaskRunnerSandbox } from 'n8n-nodes-base/dist/nodes/Code/JsTaskRunnerSandbox';
import { PythonSandbox } from 'n8n-nodes-base/dist/nodes/Code/PythonSandbox'; import { PythonSandbox } from 'n8n-nodes-base/dist/nodes/Code/PythonSandbox';
import type { Sandbox } from 'n8n-nodes-base/dist/nodes/Code/Sandbox'; import type { Sandbox } from 'n8n-nodes-base/dist/nodes/Code/Sandbox';
import { getSandboxContext } from 'n8n-nodes-base/dist/nodes/Code/Sandbox'; import { getSandboxContext } from 'n8n-nodes-base/dist/nodes/Code/Sandbox';
import type { import type {
ExecutionError,
IDataObject,
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
ISupplyDataFunctions, ISupplyDataFunctions,
SupplyData, SupplyData,
ExecutionError,
IDataObject,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
jsonParse, jsonParse,
NodeConnectionTypes, NodeConnectionTypes,
NodeOperationError,
nodeNameToToolName, nodeNameToToolName,
NodeOperationError,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
buildInputSchemaField, buildInputSchemaField,
buildJsonSchemaExampleField, buildJsonSchemaExampleField,
@@ -200,6 +203,9 @@ export class ToolCode implements INodeType {
const node = this.getNode(); const node = this.getNode();
const workflowMode = this.getMode(); const workflowMode = this.getMode();
const runnersConfig = Container.get(TaskRunnersConfig);
const isRunnerEnabled = runnersConfig.enabled;
const { typeVersion } = node; const { typeVersion } = node;
const name = const name =
typeVersion <= 1.1 typeVersion <= 1.1
@@ -218,6 +224,7 @@ export class ToolCode implements INodeType {
code = this.getNodeParameter('pythonCode', itemIndex) as string; code = this.getNodeParameter('pythonCode', itemIndex) as string;
} }
// @deprecated - TODO: Remove this after a new python runner is implemented
const getSandbox = (query: string | IDataObject, index = 0) => { const getSandbox = (query: string | IDataObject, index = 0) => {
const context = getSandboxContext.call(this, index); const context = getSandboxContext.call(this, index);
context.query = query; context.query = query;
@@ -239,15 +246,31 @@ export class ToolCode implements INodeType {
return sandbox; return sandbox;
}; };
const runFunction = async (query: string | IDataObject): Promise<string> => { const runFunction = async (query: string | IDataObject): Promise<unknown> => {
const sandbox = getSandbox(query, itemIndex); if (language === 'javaScript' && isRunnerEnabled) {
return await sandbox.runCode<string>(); const sandbox = new JsTaskRunnerSandbox(
code,
'runOnceForAllItems',
workflowMode,
this,
undefined,
{
query,
},
);
const executionData = await sandbox.runCodeForTool();
return executionData;
} else {
// use old vm2-based sandbox for python or when without runner enabled
const sandbox = getSandbox(query, itemIndex);
return await sandbox.runCode<string>();
}
}; };
const toolHandler = async (query: string | IDataObject): Promise<string> => { const toolHandler = async (query: string | IDataObject): Promise<string> => {
const { index } = this.addInputData(NodeConnectionTypes.AiTool, [[{ json: { query } }]]); const { index } = this.addInputData(NodeConnectionTypes.AiTool, [[{ json: { query } }]]);
let response: string = ''; let response: any = '';
let executionError: ExecutionError | undefined; let executionError: ExecutionError | undefined;
try { try {
response = await runFunction(query); response = await runFunction(query);

View File

@@ -192,6 +192,8 @@
"@modelcontextprotocol/sdk": "1.12.0", "@modelcontextprotocol/sdk": "1.12.0",
"@mozilla/readability": "0.6.0", "@mozilla/readability": "0.6.0",
"@n8n/client-oauth2": "workspace:*", "@n8n/client-oauth2": "workspace:*",
"@n8n/config": "workspace:*",
"@n8n/di": "workspace:*",
"@n8n/errors": "workspace:^", "@n8n/errors": "workspace:^",
"@n8n/json-schema-to-zod": "workspace:*", "@n8n/json-schema-to-zod": "workspace:*",
"@n8n/typeorm": "0.3.20-12", "@n8n/typeorm": "0.3.20-12",

View File

@@ -56,6 +56,8 @@ export interface RpcCallObject {
export interface JSExecSettings { export interface JSExecSettings {
code: string; code: string;
// Additional properties to add to the context
additionalProperties?: Record<string, unknown>;
nodeMode: CodeExecutionMode; nodeMode: CodeExecutionMode;
workflowMode: WorkflowExecuteMode; workflowMode: WorkflowExecuteMode;
continueOnFail: boolean; continueOnFail: boolean;
@@ -245,6 +247,7 @@ export class JsTaskRunner extends TaskRunner {
const context = this.buildContext(taskId, workflow, data.node, dataProxy, { const context = this.buildContext(taskId, workflow, data.node, dataProxy, {
items: inputItems, items: inputItems,
...settings.additionalProperties,
}); });
try { try {
@@ -309,7 +312,13 @@ export class JsTaskRunner extends TaskRunner {
? settings.chunk.startIndex + settings.chunk.count ? settings.chunk.startIndex + settings.chunk.count
: inputItems.length; : inputItems.length;
const context = this.buildContext(taskId, workflow, data.node); const context = this.buildContext(
taskId,
workflow,
data.node,
undefined,
settings.additionalProperties,
);
for (let index = chunkStartIdx; index < chunkEndIdx; index++) { for (let index = chunkStartIdx; index < chunkEndIdx; index++) {
const dataProxy = this.createDataProxy(data, workflow, index); const dataProxy = this.createDataProxy(data, workflow, index);

View File

@@ -21,6 +21,8 @@ import type {
ISourceData, ISourceData,
AiEvent, AiEvent,
NodeConnectionType, NodeConnectionType,
Result,
IExecuteFunctions,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
ApplicationError, ApplicationError,
@@ -28,6 +30,7 @@ import {
NodeConnectionTypes, NodeConnectionTypes,
WAIT_INDEFINITELY, WAIT_INDEFINITELY,
WorkflowDataProxy, WorkflowDataProxy,
createEnvProviderState,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { BinaryDataService } from '@/binary-data/binary-data.service'; import { BinaryDataService } from '@/binary-data/binary-data.service';
@@ -229,4 +232,29 @@ export class BaseExecuteContext extends NodeExecutionContext {
msg, msg,
}); });
} }
async startJob<T = unknown, E = unknown>(
jobType: string,
settings: unknown,
itemIndex: number,
): Promise<Result<T, E>> {
return await this.additionalData.startRunnerTask<T, E>(
this.additionalData,
jobType,
settings,
this as IExecuteFunctions,
this.inputData,
this.node,
this.workflow,
this.runExecutionData,
this.runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
{},
this.mode,
createEnvProviderState(),
this.executeData,
);
}
} }

View File

@@ -14,7 +14,6 @@ import type {
ITaskDataConnections, ITaskDataConnections,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
NodeExecutionHint, NodeExecutionHint,
Result,
StructuredChunk, StructuredChunk,
Workflow, Workflow,
WorkflowExecuteMode, WorkflowExecuteMode,
@@ -22,7 +21,6 @@ import type {
import { import {
ApplicationError, ApplicationError,
createDeferredPromise, createDeferredPromise,
createEnvProviderState,
jsonParse, jsonParse,
NodeConnectionTypes, NodeConnectionTypes,
} from 'n8n-workflow'; } from 'n8n-workflow';
@@ -173,31 +171,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
await this.additionalData.hooks?.runHook('sendChunk', [message]); await this.additionalData.hooks?.runHook('sendChunk', [message]);
} }
async startJob<T = unknown, E = unknown>(
jobType: string,
settings: unknown,
itemIndex: number,
): Promise<Result<T, E>> {
return await this.additionalData.startRunnerTask<T, E>(
this.additionalData,
jobType,
settings,
this,
this.inputData,
this.node,
this.workflow,
this.runExecutionData,
this.runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
{},
this.mode,
createEnvProviderState(),
this.executeData,
);
}
async getInputConnectionData( async getInputConnectionData(
connectionType: AINodeConnectionType, connectionType: AINodeConnectionType,
itemIndex: number, itemIndex: number,

View File

@@ -22,8 +22,12 @@ export class JsTaskRunnerSandbox {
private readonly jsCode: string, private readonly jsCode: string,
private readonly nodeMode: CodeExecutionMode, private readonly nodeMode: CodeExecutionMode,
private readonly workflowMode: WorkflowExecuteMode, private readonly workflowMode: WorkflowExecuteMode,
private readonly executeFunctions: IExecuteFunctions, private readonly executeFunctions: Pick<
IExecuteFunctions,
'startJob' | 'continueOnFail' | 'helpers'
>,
private readonly chunkSize = 1000, private readonly chunkSize = 1000,
private readonly additionalProperties: Record<string, unknown> = {},
) {} ) {}
async runCodeAllItems(): Promise<INodeExecutionData[]> { async runCodeAllItems(): Promise<INodeExecutionData[]> {
@@ -36,6 +40,7 @@ export class JsTaskRunnerSandbox {
nodeMode: this.nodeMode, nodeMode: this.nodeMode,
workflowMode: this.workflowMode, workflowMode: this.workflowMode,
continueOnFail: this.executeFunctions.continueOnFail(), continueOnFail: this.executeFunctions.continueOnFail(),
additionalProperties: this.additionalProperties,
}, },
itemIndex, itemIndex,
); );
@@ -51,6 +56,28 @@ export class JsTaskRunnerSandbox {
); );
} }
async runCodeForTool(): Promise<unknown> {
const itemIndex = 0;
const executionResult = await this.executeFunctions.startJob(
'javascript',
{
code: this.jsCode,
nodeMode: this.nodeMode,
workflowMode: this.workflowMode,
continueOnFail: this.executeFunctions.continueOnFail(),
additionalProperties: this.additionalProperties,
},
itemIndex,
);
if (!executionResult.ok) {
throwExecutionError('error' in executionResult ? executionResult.error : {});
}
return executionResult.result;
}
async runCodeForEachItem(numInputItems: number): Promise<INodeExecutionData[]> { async runCodeForEachItem(numInputItems: number): Promise<INodeExecutionData[]> {
validateNoDisallowedMethodsInRunForEach(this.jsCode, 0); validateNoDisallowedMethodsInRunForEach(this.jsCode, 0);
@@ -70,6 +97,7 @@ export class JsTaskRunnerSandbox {
startIndex: chunk.startIdx, startIndex: chunk.startIdx,
count: chunk.count, count: chunk.count,
}, },
additionalProperties: this.additionalProperties,
}, },
itemIndex, itemIndex,
); );

View File

@@ -1,6 +1,6 @@
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions } from 'n8n-workflow'; import type { IExecuteFunctions } from 'n8n-workflow';
import { createResultOk } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow';
import { JsTaskRunnerSandbox } from '../JsTaskRunnerSandbox'; import { JsTaskRunnerSandbox } from '../JsTaskRunnerSandbox';
@@ -15,7 +15,8 @@ describe('JsTaskRunnerSandbox', () => {
...executeFunctions.helpers, ...executeFunctions.helpers,
normalizeItems: jest normalizeItems: jest
.fn() .fn()
.mockImplementation((items) => (Array.isArray(items) ? items : [items])), // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-return
.mockImplementation((items: any) => (Array.isArray(items) ? items : [items])),
}; };
const sandbox = new JsTaskRunnerSandbox(jsCode, nodeMode, workflowMode, executeFunctions, 2); const sandbox = new JsTaskRunnerSandbox(jsCode, nodeMode, workflowMode, executeFunctions, 2);
@@ -37,6 +38,7 @@ describe('JsTaskRunnerSandbox', () => {
workflowMode, workflowMode,
continueOnFail: executeFunctions.continueOnFail(), continueOnFail: executeFunctions.continueOnFail(),
chunk: { startIndex: 0, count: 2 }, chunk: { startIndex: 0, count: 2 },
additionalProperties: {},
}, },
0, 0,
], ],
@@ -48,6 +50,7 @@ describe('JsTaskRunnerSandbox', () => {
workflowMode, workflowMode,
continueOnFail: executeFunctions.continueOnFail(), continueOnFail: executeFunctions.continueOnFail(),
chunk: { startIndex: 2, count: 2 }, chunk: { startIndex: 2, count: 2 },
additionalProperties: {},
}, },
0, 0,
], ],
@@ -59,10 +62,80 @@ describe('JsTaskRunnerSandbox', () => {
workflowMode, workflowMode,
continueOnFail: executeFunctions.continueOnFail(), continueOnFail: executeFunctions.continueOnFail(),
chunk: { startIndex: 4, count: 1 }, chunk: { startIndex: 4, count: 1 },
additionalProperties: {},
}, },
0, 0,
], ],
]); ]);
}); });
}); });
describe('runCodeForTool', () => {
it('should execute code and return string result', async () => {
const jsCode = 'return "Hello World";';
const nodeMode = 'runOnceForAllItems';
const workflowMode = 'manual';
const executeFunctions = mock<IExecuteFunctions>();
executeFunctions.helpers = {
...executeFunctions.helpers,
normalizeItems: jest
.fn()
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-return
.mockImplementation((items: any) => (Array.isArray(items) ? items : [items])),
};
const sandbox = new JsTaskRunnerSandbox(jsCode, nodeMode, workflowMode, executeFunctions);
const expectedResult = 'Hello World';
executeFunctions.startJob.mockResolvedValue(createResultOk(expectedResult));
const result = await sandbox.runCodeForTool();
expect(result).toBe(expectedResult);
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(executeFunctions.startJob).toHaveBeenCalledTimes(1);
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(executeFunctions.startJob).toHaveBeenCalledWith(
'javascript',
{
code: jsCode,
nodeMode,
workflowMode,
continueOnFail: executeFunctions.continueOnFail(),
additionalProperties: {},
},
0,
);
});
it('should handle execution errors by calling throwExecutionError', async () => {
const jsCode = 'throw new Error("execution failed");';
const nodeMode = 'runOnceForAllItems';
const workflowMode = 'manual';
const executeFunctions = mock<IExecuteFunctions>();
executeFunctions.helpers = {
...executeFunctions.helpers,
normalizeItems: jest
.fn()
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-return
.mockImplementation((items: any) => (Array.isArray(items) ? items : [items])),
};
const sandbox = new JsTaskRunnerSandbox(jsCode, nodeMode, workflowMode, executeFunctions);
const executionError = { message: 'execution failed', stack: 'error stack' };
executeFunctions.startJob.mockResolvedValue(createResultError(executionError));
// Mock throwExecutionError to throw an error for testing
const throwExecutionErrorModule = await import('../throw-execution-error');
const throwExecutionErrorSpy = jest
.spyOn(throwExecutionErrorModule, 'throwExecutionError')
.mockImplementation(() => {
throw new Error('Execution failed');
});
await expect(sandbox.runCodeForTool()).rejects.toThrow('Execution failed');
expect(throwExecutionErrorSpy).toHaveBeenCalledWith(executionError);
});
});
}); });

View File

@@ -1053,6 +1053,7 @@ export type ISupplyDataFunctions = ExecuteFunctions.GetNodeParameterFn &
| 'getNodeOutputs' | 'getNodeOutputs'
| 'executeWorkflow' | 'executeWorkflow'
| 'sendMessageToUI' | 'sendMessageToUI'
| 'startJob'
| 'helpers' | 'helpers'
> & { > & {
getNextRunIndex(): number; getNextRunIndex(): number;

6
pnpm-lock.yaml generated
View File

@@ -1081,6 +1081,12 @@ importers:
'@n8n/client-oauth2': '@n8n/client-oauth2':
specifier: workspace:* specifier: workspace:*
version: link:../client-oauth2 version: link:../client-oauth2
'@n8n/config':
specifier: workspace:*
version: link:../config
'@n8n/di':
specifier: workspace:*
version: link:../di
'@n8n/errors': '@n8n/errors':
specifier: workspace:^ specifier: workspace:^
version: link:../errors version: link:../errors