mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 01:56:46 +00:00
feat: Add once for each item support for JS task runner (no-changelog) (#11109)
This commit is contained in:
@@ -6,13 +6,14 @@
|
||||
"start": "node dist/start.js",
|
||||
"dev": "pnpm build && pnpm start",
|
||||
"typecheck": "tsc --noEmit",
|
||||
"build": "tsc -p ./tsconfig.build.json",
|
||||
"build": "tsc -p ./tsconfig.build.json && tsc-alias -p tsconfig.build.json",
|
||||
"format": "biome format --write src",
|
||||
"format:check": "biome ci src",
|
||||
"test": "echo \"Error: no tests in this package\" && exit 0",
|
||||
"test": "jest",
|
||||
"test:watch": "jest --watch",
|
||||
"lint": "eslint . --quiet",
|
||||
"lintfix": "eslint . --fix",
|
||||
"watch": "tsc -p tsconfig.build.json --watch"
|
||||
"watch": "concurrently \"tsc -w -p tsconfig.build.json\" \"tsc-alias -w -p tsconfig.build.json\""
|
||||
},
|
||||
"main": "dist/start.js",
|
||||
"module": "src/start.ts",
|
||||
|
||||
319
packages/@n8n/task-runner/src/__tests__/code.test.ts
Normal file
319
packages/@n8n/task-runner/src/__tests__/code.test.ts
Normal file
@@ -0,0 +1,319 @@
|
||||
import type { CodeExecutionMode, IDataObject, WorkflowExecuteMode } from 'n8n-workflow';
|
||||
|
||||
import { JsTaskRunner, type AllCodeTaskData, type JSExecSettings } from '@/code';
|
||||
import type { Task } from '@/task-runner';
|
||||
import { ValidationError } from '@/validation-error';
|
||||
|
||||
import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data';
|
||||
|
||||
jest.mock('ws');
|
||||
|
||||
describe('JsTaskRunner', () => {
|
||||
const jsTaskRunner = new JsTaskRunner('taskType', 'ws://localhost', 'grantToken', 1);
|
||||
|
||||
const execTaskWithParams = async ({
|
||||
task,
|
||||
taskData,
|
||||
}: {
|
||||
task: Task<JSExecSettings>;
|
||||
taskData: AllCodeTaskData;
|
||||
}) => {
|
||||
jest.spyOn(jsTaskRunner, 'requestData').mockResolvedValue(taskData);
|
||||
return await jsTaskRunner.executeTask(task);
|
||||
};
|
||||
|
||||
afterEach(() => {
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('console', () => {
|
||||
test.each<[CodeExecutionMode, WorkflowExecuteMode]>([
|
||||
['runOnceForAllItems', 'cli'],
|
||||
['runOnceForAllItems', 'error'],
|
||||
['runOnceForAllItems', 'integrated'],
|
||||
['runOnceForAllItems', 'internal'],
|
||||
['runOnceForAllItems', 'retry'],
|
||||
['runOnceForAllItems', 'trigger'],
|
||||
['runOnceForAllItems', 'webhook'],
|
||||
['runOnceForEachItem', 'cli'],
|
||||
['runOnceForEachItem', 'error'],
|
||||
['runOnceForEachItem', 'integrated'],
|
||||
['runOnceForEachItem', 'internal'],
|
||||
['runOnceForEachItem', 'retry'],
|
||||
['runOnceForEachItem', 'trigger'],
|
||||
['runOnceForEachItem', 'webhook'],
|
||||
])(
|
||||
'should make an rpc call for console log in %s mode when workflow mode is %s',
|
||||
async (nodeMode, workflowMode) => {
|
||||
jest.spyOn(console, 'log').mockImplementation(() => {});
|
||||
jest.spyOn(jsTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
|
||||
const task = newTaskWithSettings({
|
||||
code: "console.log('Hello', 'world!'); return {}",
|
||||
nodeMode,
|
||||
workflowMode,
|
||||
});
|
||||
|
||||
await execTaskWithParams({
|
||||
task,
|
||||
taskData: newAllCodeTaskData([wrapIntoJson({})]),
|
||||
});
|
||||
|
||||
expect(console.log).toHaveBeenCalledWith('[JS Code]', 'Hello world!');
|
||||
expect(jsTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
|
||||
'Hello world!',
|
||||
]);
|
||||
},
|
||||
);
|
||||
|
||||
test.each<[CodeExecutionMode, WorkflowExecuteMode]>([
|
||||
['runOnceForAllItems', 'manual'],
|
||||
['runOnceForEachItem', 'manual'],
|
||||
])(
|
||||
"shouldn't make an rpc call for console log in %s mode when workflow mode is %s",
|
||||
async (nodeMode, workflowMode) => {
|
||||
jest.spyOn(jsTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
|
||||
const task = newTaskWithSettings({
|
||||
code: "console.log('Hello', 'world!'); return {}",
|
||||
nodeMode,
|
||||
workflowMode,
|
||||
});
|
||||
|
||||
await execTaskWithParams({
|
||||
task,
|
||||
taskData: newAllCodeTaskData([wrapIntoJson({})]),
|
||||
});
|
||||
|
||||
expect(jsTaskRunner.makeRpcCall).not.toHaveBeenCalled();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('runOnceForAllItems', () => {
|
||||
const executeForAllItems = async ({
|
||||
code,
|
||||
inputItems,
|
||||
settings,
|
||||
}: { code: string; inputItems: IDataObject[]; settings?: Partial<JSExecSettings> }) => {
|
||||
return await execTaskWithParams({
|
||||
task: newTaskWithSettings({
|
||||
code,
|
||||
nodeMode: 'runOnceForAllItems',
|
||||
...settings,
|
||||
}),
|
||||
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
|
||||
});
|
||||
};
|
||||
|
||||
describe('continue on fail', () => {
|
||||
it('should return an item with the error if continueOnFail is true', async () => {
|
||||
const outcome = await executeForAllItems({
|
||||
code: 'throw new Error("Error message")',
|
||||
inputItems: [{ a: 1 }],
|
||||
settings: { continueOnFail: true },
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [wrapIntoJson({ error: 'Error message' })],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error if continueOnFail is false', async () => {
|
||||
await expect(
|
||||
executeForAllItems({
|
||||
code: 'throw new Error("Error message")',
|
||||
inputItems: [{ a: 1 }],
|
||||
settings: { continueOnFail: false },
|
||||
}),
|
||||
).rejects.toThrow('Error message');
|
||||
});
|
||||
});
|
||||
|
||||
describe('invalid output', () => {
|
||||
test.each([['undefined'], ['42'], ['"a string"']])(
|
||||
'should throw a ValidationError if the code output is %s',
|
||||
async (output) => {
|
||||
await expect(
|
||||
executeForAllItems({
|
||||
code: `return ${output}`,
|
||||
inputItems: [{ a: 1 }],
|
||||
}),
|
||||
).rejects.toThrow(ValidationError);
|
||||
},
|
||||
);
|
||||
|
||||
it('should throw a ValidationError if some items are wrapped in json and some are not', async () => {
|
||||
await expect(
|
||||
executeForAllItems({
|
||||
code: 'return [{b: 1}, {json: {b: 2}}]',
|
||||
inputItems: [{ a: 1 }],
|
||||
}),
|
||||
).rejects.toThrow(ValidationError);
|
||||
});
|
||||
});
|
||||
|
||||
it('should return static items', async () => {
|
||||
const outcome = await executeForAllItems({
|
||||
code: 'return [{json: {b: 1}}]',
|
||||
inputItems: [{ a: 1 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [wrapIntoJson({ b: 1 })],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('maps null into an empty array', async () => {
|
||||
const outcome = await executeForAllItems({
|
||||
code: 'return null',
|
||||
inputItems: [{ a: 1 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("should wrap items into json if they aren't", async () => {
|
||||
const outcome = await executeForAllItems({
|
||||
code: 'return [{b: 1}]',
|
||||
inputItems: [{ a: 1 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [wrapIntoJson({ b: 1 })],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should wrap single item into an array and json', async () => {
|
||||
const outcome = await executeForAllItems({
|
||||
code: 'return {b: 1}',
|
||||
inputItems: [{ a: 1 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [wrapIntoJson({ b: 1 })],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
test.each([['items'], ['$input.all()'], ["$('Trigger').all()"]])(
|
||||
'should have all input items in the context as %s',
|
||||
async (expression) => {
|
||||
const outcome = await executeForAllItems({
|
||||
code: `return ${expression}`,
|
||||
inputItems: [{ a: 1 }, { a: 2 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [wrapIntoJson({ a: 1 }), wrapIntoJson({ a: 2 })],
|
||||
customData: undefined,
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('runForEachItem', () => {
|
||||
const executeForEachItem = async ({
|
||||
code,
|
||||
inputItems,
|
||||
settings,
|
||||
}: { code: string; inputItems: IDataObject[]; settings?: Partial<JSExecSettings> }) => {
|
||||
return await execTaskWithParams({
|
||||
task: newTaskWithSettings({
|
||||
code,
|
||||
nodeMode: 'runOnceForEachItem',
|
||||
...settings,
|
||||
}),
|
||||
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
|
||||
});
|
||||
};
|
||||
|
||||
describe('continue on fail', () => {
|
||||
it('should return an item with the error if continueOnFail is true', async () => {
|
||||
const outcome = await executeForEachItem({
|
||||
code: 'throw new Error("Error message")',
|
||||
inputItems: [{ a: 1 }, { a: 2 }],
|
||||
settings: { continueOnFail: true },
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [
|
||||
withPairedItem(0, wrapIntoJson({ error: 'Error message' })),
|
||||
withPairedItem(1, wrapIntoJson({ error: 'Error message' })),
|
||||
],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error if continueOnFail is false', async () => {
|
||||
await expect(
|
||||
executeForEachItem({
|
||||
code: 'throw new Error("Error message")',
|
||||
inputItems: [{ a: 1 }],
|
||||
settings: { continueOnFail: false },
|
||||
}),
|
||||
).rejects.toThrow('Error message');
|
||||
});
|
||||
});
|
||||
|
||||
describe('invalid output', () => {
|
||||
test.each([['undefined'], ['42'], ['"a string"'], ['[]'], ['[1,2,3]']])(
|
||||
'should throw a ValidationError if the code output is %s',
|
||||
async (output) => {
|
||||
await expect(
|
||||
executeForEachItem({
|
||||
code: `return ${output}`,
|
||||
inputItems: [{ a: 1 }],
|
||||
}),
|
||||
).rejects.toThrow(ValidationError);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it('should return static items', async () => {
|
||||
const outcome = await executeForEachItem({
|
||||
code: 'return {json: {b: 1}}',
|
||||
inputItems: [{ a: 1 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [withPairedItem(0, wrapIntoJson({ b: 1 }))],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should filter out null values', async () => {
|
||||
const outcome = await executeForEachItem({
|
||||
code: 'return item.json.a === 1 ? item : null',
|
||||
inputItems: [{ a: 1 }, { a: 2 }, { a: 3 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [withPairedItem(0, wrapIntoJson({ a: 1 }))],
|
||||
customData: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
test.each([['item'], ['$input.item'], ['{ json: $json }']])(
|
||||
'should have the current input item in the context as %s',
|
||||
async (expression) => {
|
||||
const outcome = await executeForEachItem({
|
||||
code: `return ${expression}`,
|
||||
inputItems: [{ a: 1 }, { a: 2 }],
|
||||
});
|
||||
|
||||
expect(outcome).toEqual({
|
||||
result: [
|
||||
withPairedItem(0, wrapIntoJson({ a: 1 })),
|
||||
withPairedItem(1, wrapIntoJson({ a: 2 })),
|
||||
],
|
||||
customData: undefined,
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
148
packages/@n8n/task-runner/src/__tests__/test-data.ts
Normal file
148
packages/@n8n/task-runner/src/__tests__/test-data.ts
Normal file
@@ -0,0 +1,148 @@
|
||||
import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow';
|
||||
import { NodeConnectionType } from 'n8n-workflow';
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type { AllCodeTaskData, JSExecSettings } from '@/code';
|
||||
import type { Task } from '@/task-runner';
|
||||
|
||||
/**
|
||||
* Creates a new task with the given settings
|
||||
*/
|
||||
export const newTaskWithSettings = (
|
||||
settings: Partial<JSExecSettings> & Pick<JSExecSettings, 'code' | 'nodeMode'>,
|
||||
): Task<JSExecSettings> => ({
|
||||
taskId: '1',
|
||||
settings: {
|
||||
workflowMode: 'manual',
|
||||
continueOnFail: false,
|
||||
mode: 'manual',
|
||||
...settings,
|
||||
},
|
||||
active: true,
|
||||
cancelled: false,
|
||||
});
|
||||
|
||||
/**
|
||||
* Creates a new node with the given options
|
||||
*/
|
||||
export const newNode = (opts: Partial<INode> = {}): INode => ({
|
||||
id: nanoid(),
|
||||
name: 'Test Node' + nanoid(),
|
||||
parameters: {},
|
||||
position: [0, 0],
|
||||
type: 'n8n-nodes-base.code',
|
||||
typeVersion: 1,
|
||||
...opts,
|
||||
});
|
||||
|
||||
/**
|
||||
* Creates a new task data with the given options
|
||||
*/
|
||||
export const newTaskData = (opts: Partial<ITaskData> & Pick<ITaskData, 'source'>): ITaskData => ({
|
||||
startTime: Date.now(),
|
||||
executionTime: 0,
|
||||
executionStatus: 'success',
|
||||
...opts,
|
||||
});
|
||||
|
||||
/**
|
||||
* Creates a new all code task data with the given options
|
||||
*/
|
||||
export const newAllCodeTaskData = (
|
||||
codeNodeInputData: INodeExecutionData[],
|
||||
opts: Partial<AllCodeTaskData> = {},
|
||||
): AllCodeTaskData => {
|
||||
const codeNode = newNode({
|
||||
name: 'JsCode',
|
||||
parameters: {
|
||||
mode: 'runOnceForEachItem',
|
||||
language: 'javaScript',
|
||||
jsCode: 'return item',
|
||||
},
|
||||
type: 'n8n-nodes-base.code',
|
||||
typeVersion: 2,
|
||||
});
|
||||
const manualTriggerNode = newNode({
|
||||
name: 'Trigger',
|
||||
type: 'n8n-nodes-base.manualTrigger',
|
||||
});
|
||||
|
||||
return {
|
||||
workflow: {
|
||||
id: '1',
|
||||
name: 'Test Workflow',
|
||||
active: true,
|
||||
connections: {
|
||||
[manualTriggerNode.name]: {
|
||||
main: [[{ node: codeNode.name, type: NodeConnectionType.Main, index: 0 }]],
|
||||
},
|
||||
},
|
||||
nodes: [manualTriggerNode, codeNode],
|
||||
},
|
||||
inputData: {
|
||||
main: [codeNodeInputData],
|
||||
},
|
||||
connectionInputData: codeNodeInputData,
|
||||
node: codeNode,
|
||||
runExecutionData: {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {
|
||||
[manualTriggerNode.name]: [
|
||||
newTaskData({
|
||||
source: [],
|
||||
data: {
|
||||
main: [codeNodeInputData],
|
||||
},
|
||||
}),
|
||||
],
|
||||
},
|
||||
pinData: {},
|
||||
lastNodeExecuted: manualTriggerNode.name,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
},
|
||||
runIndex: 0,
|
||||
itemIndex: 0,
|
||||
activeNodeName: codeNode.name,
|
||||
contextNodeName: codeNode.name,
|
||||
defaultReturnRunIndex: -1,
|
||||
siblingParameters: {},
|
||||
mode: 'manual',
|
||||
selfData: {},
|
||||
additionalData: {
|
||||
formWaitingBaseUrl: '',
|
||||
instanceBaseUrl: '',
|
||||
restartExecutionId: '',
|
||||
restApiUrl: '',
|
||||
webhookBaseUrl: '',
|
||||
webhookTestBaseUrl: '',
|
||||
webhookWaitingBaseUrl: '',
|
||||
variables: {},
|
||||
},
|
||||
...opts,
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Wraps the given value into an INodeExecutionData object's json property
|
||||
*/
|
||||
export const wrapIntoJson = (json: IDataObject): INodeExecutionData => ({
|
||||
json,
|
||||
});
|
||||
|
||||
/**
|
||||
* Adds the given index as the pairedItem property to the given INodeExecutionData object
|
||||
*/
|
||||
export const withPairedItem = (index: number, data: INodeExecutionData): INodeExecutionData => ({
|
||||
...data,
|
||||
pairedItem: {
|
||||
item: index,
|
||||
},
|
||||
});
|
||||
@@ -1,28 +1,36 @@
|
||||
import { getAdditionalKeys } from 'n8n-core';
|
||||
import {
|
||||
type INode,
|
||||
type INodeType,
|
||||
type ITaskDataConnections,
|
||||
type IWorkflowExecuteAdditionalData,
|
||||
WorkflowDataProxy,
|
||||
type WorkflowParameters,
|
||||
type IDataObject,
|
||||
type IExecuteData,
|
||||
type INodeExecutionData,
|
||||
type INodeParameters,
|
||||
type IRunExecutionData,
|
||||
// type IWorkflowDataProxyAdditionalKeys,
|
||||
Workflow,
|
||||
type WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import type {
|
||||
CodeExecutionMode,
|
||||
INode,
|
||||
INodeType,
|
||||
ITaskDataConnections,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
WorkflowParameters,
|
||||
IDataObject,
|
||||
IExecuteData,
|
||||
INodeExecutionData,
|
||||
INodeParameters,
|
||||
IRunExecutionData,
|
||||
WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import * as a from 'node:assert';
|
||||
import { runInNewContext, type Context } from 'node:vm';
|
||||
|
||||
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from '@/result-validation';
|
||||
|
||||
import type { TaskResultData } from './runner-types';
|
||||
import { type Task, TaskRunner } from './task-runner';
|
||||
|
||||
interface JSExecSettings {
|
||||
export interface JSExecSettings {
|
||||
code: string;
|
||||
nodeMode: CodeExecutionMode;
|
||||
workflowMode: WorkflowExecuteMode;
|
||||
continueOnFail: boolean;
|
||||
|
||||
// For workflow data proxy
|
||||
mode: WorkflowExecuteMode;
|
||||
@@ -62,6 +70,12 @@ export interface AllCodeTaskData {
|
||||
additionalData: PartialAdditionalData;
|
||||
}
|
||||
|
||||
type CustomConsole = {
|
||||
log: (...args: unknown[]) => void;
|
||||
};
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
export class JsTaskRunner extends TaskRunner {
|
||||
constructor(
|
||||
taskType: string,
|
||||
@@ -95,15 +109,154 @@ export class JsTaskRunner extends TaskRunner {
|
||||
},
|
||||
});
|
||||
|
||||
const dataProxy = new WorkflowDataProxy(
|
||||
const customConsole = {
|
||||
log:
|
||||
settings.workflowMode === 'manual'
|
||||
? noop
|
||||
: (...args: unknown[]) => {
|
||||
const logOutput = args
|
||||
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
|
||||
.join(' ');
|
||||
console.log('[JS Code]', logOutput);
|
||||
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
|
||||
},
|
||||
};
|
||||
|
||||
const result =
|
||||
settings.nodeMode === 'runOnceForAllItems'
|
||||
? await this.runForAllItems(task.taskId, settings, allData, workflow, customConsole)
|
||||
: await this.runForEachItem(task.taskId, settings, allData, workflow, customConsole);
|
||||
|
||||
return {
|
||||
result,
|
||||
customData: allData.runExecutionData.resultData.metadata,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the requested code for all items in a single run
|
||||
*/
|
||||
private async runForAllItems(
|
||||
taskId: string,
|
||||
settings: JSExecSettings,
|
||||
allData: AllCodeTaskData,
|
||||
workflow: Workflow,
|
||||
customConsole: CustomConsole,
|
||||
): Promise<INodeExecutionData[]> {
|
||||
const dataProxy = this.createDataProxy(allData, workflow, allData.itemIndex);
|
||||
const inputItems = allData.connectionInputData;
|
||||
|
||||
const context: Context = {
|
||||
require,
|
||||
module: {},
|
||||
console: customConsole,
|
||||
|
||||
items: inputItems,
|
||||
...dataProxy,
|
||||
...this.buildRpcCallObject(taskId),
|
||||
};
|
||||
|
||||
try {
|
||||
const result = (await runInNewContext(
|
||||
`module.exports = async function() {${settings.code}\n}()`,
|
||||
context,
|
||||
)) as TaskResultData['result'];
|
||||
|
||||
if (result === null) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return validateRunForAllItemsOutput(result);
|
||||
} catch (error) {
|
||||
if (settings.continueOnFail) {
|
||||
return [{ json: { error: this.getErrorMessageFromVmError(error) } }];
|
||||
}
|
||||
|
||||
(error as Record<string, unknown>).node = allData.node;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the requested code for each item in the input data
|
||||
*/
|
||||
private async runForEachItem(
|
||||
taskId: string,
|
||||
settings: JSExecSettings,
|
||||
allData: AllCodeTaskData,
|
||||
workflow: Workflow,
|
||||
customConsole: CustomConsole,
|
||||
): Promise<INodeExecutionData[]> {
|
||||
const inputItems = allData.connectionInputData;
|
||||
const returnData: INodeExecutionData[] = [];
|
||||
|
||||
for (let index = 0; index < inputItems.length; index++) {
|
||||
const item = inputItems[index];
|
||||
const dataProxy = this.createDataProxy(allData, workflow, index);
|
||||
const context: Context = {
|
||||
require,
|
||||
module: {},
|
||||
console: customConsole,
|
||||
item,
|
||||
|
||||
...dataProxy,
|
||||
...this.buildRpcCallObject(taskId),
|
||||
};
|
||||
|
||||
try {
|
||||
let result = (await runInNewContext(
|
||||
`module.exports = async function() {${settings.code}\n}()`,
|
||||
context,
|
||||
)) as INodeExecutionData | undefined;
|
||||
|
||||
// Filter out null values
|
||||
if (result === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
result = validateRunForEachItemOutput(result, index);
|
||||
if (result) {
|
||||
returnData.push(
|
||||
result.binary
|
||||
? {
|
||||
json: result.json,
|
||||
pairedItem: { item: index },
|
||||
binary: result.binary,
|
||||
}
|
||||
: {
|
||||
json: result.json,
|
||||
pairedItem: { item: index },
|
||||
},
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
if (!settings.continueOnFail) {
|
||||
(error as Record<string, unknown>).node = allData.node;
|
||||
throw error;
|
||||
}
|
||||
|
||||
returnData.push({
|
||||
json: { error: this.getErrorMessageFromVmError(error) },
|
||||
pairedItem: {
|
||||
item: index,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return returnData;
|
||||
}
|
||||
|
||||
private createDataProxy(allData: AllCodeTaskData, workflow: Workflow, itemIndex: number) {
|
||||
return new WorkflowDataProxy(
|
||||
workflow,
|
||||
allData.runExecutionData,
|
||||
allData.runIndex,
|
||||
allData.itemIndex,
|
||||
itemIndex,
|
||||
allData.activeNodeName,
|
||||
allData.connectionInputData,
|
||||
allData.siblingParameters,
|
||||
settings.mode,
|
||||
allData.mode,
|
||||
getAdditionalKeys(
|
||||
allData.additionalData as IWorkflowExecuteAdditionalData,
|
||||
allData.mode,
|
||||
@@ -113,35 +266,14 @@ export class JsTaskRunner extends TaskRunner {
|
||||
allData.defaultReturnRunIndex,
|
||||
allData.selfData,
|
||||
allData.contextNodeName,
|
||||
);
|
||||
).getDataProxy();
|
||||
}
|
||||
|
||||
const customConsole = {
|
||||
log: (...args: unknown[]) => {
|
||||
const logOutput = args
|
||||
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
|
||||
.join(' ');
|
||||
console.log('[JS Code]', logOutput);
|
||||
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
|
||||
},
|
||||
};
|
||||
private getErrorMessageFromVmError(error: unknown): string {
|
||||
if (typeof error === 'object' && !!error && 'message' in error) {
|
||||
return error.message as string;
|
||||
}
|
||||
|
||||
const context: Context = {
|
||||
require,
|
||||
module: {},
|
||||
console: customConsole,
|
||||
|
||||
...dataProxy.getDataProxy(),
|
||||
...this.buildRpcCallObject(task.taskId),
|
||||
};
|
||||
|
||||
const result = (await runInNewContext(
|
||||
`module.exports = async function() {${settings.code}\n}()`,
|
||||
context,
|
||||
)) as TaskResultData['result'];
|
||||
|
||||
return {
|
||||
result,
|
||||
customData: allData.runExecutionData.resultData.metadata,
|
||||
};
|
||||
return JSON.stringify(error);
|
||||
}
|
||||
}
|
||||
|
||||
84
packages/@n8n/task-runner/src/execution-error.ts
Normal file
84
packages/@n8n/task-runner/src/execution-error.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class ExecutionError extends ApplicationError {
|
||||
description: string | null = null;
|
||||
|
||||
itemIndex: number | undefined = undefined;
|
||||
|
||||
context: { itemIndex: number } | undefined = undefined;
|
||||
|
||||
stack = '';
|
||||
|
||||
lineNumber: number | undefined = undefined;
|
||||
|
||||
constructor(error: Error & { stack?: string }, itemIndex?: number) {
|
||||
super(error.message);
|
||||
this.itemIndex = itemIndex;
|
||||
|
||||
if (this.itemIndex !== undefined) {
|
||||
this.context = { itemIndex: this.itemIndex };
|
||||
}
|
||||
|
||||
this.stack = error.stack ?? '';
|
||||
|
||||
this.populateFromStack();
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate error `message` and `description` from error `stack`.
|
||||
*/
|
||||
private populateFromStack() {
|
||||
const stackRows = this.stack.split('\n');
|
||||
|
||||
if (stackRows.length === 0) {
|
||||
this.message = 'Unknown error';
|
||||
}
|
||||
|
||||
const messageRow = stackRows.find((line) => line.includes('Error:'));
|
||||
const lineNumberRow = stackRows.find((line) => line.includes('Code:'));
|
||||
const lineNumberDisplay = this.toLineNumberDisplay(lineNumberRow);
|
||||
|
||||
if (!messageRow) {
|
||||
this.message = `Unknown error ${lineNumberDisplay}`;
|
||||
return;
|
||||
}
|
||||
|
||||
const [errorDetails, errorType] = this.toErrorDetailsAndType(messageRow);
|
||||
|
||||
if (errorType) this.description = errorType;
|
||||
|
||||
if (!errorDetails) {
|
||||
this.message = `Unknown error ${lineNumberDisplay}`;
|
||||
return;
|
||||
}
|
||||
|
||||
this.message = `${errorDetails} ${lineNumberDisplay}`;
|
||||
}
|
||||
|
||||
private toLineNumberDisplay(lineNumberRow?: string) {
|
||||
const errorLineNumberMatch = lineNumberRow?.match(/Code:(?<lineNumber>\d+)/);
|
||||
|
||||
if (!errorLineNumberMatch?.groups?.lineNumber) return null;
|
||||
|
||||
const lineNumber = errorLineNumberMatch.groups.lineNumber;
|
||||
|
||||
this.lineNumber = Number(lineNumber);
|
||||
|
||||
if (!lineNumber) return '';
|
||||
|
||||
return this.itemIndex === undefined
|
||||
? `[line ${lineNumber}]`
|
||||
: `[line ${lineNumber}, for item ${this.itemIndex}]`;
|
||||
}
|
||||
|
||||
private toErrorDetailsAndType(messageRow?: string) {
|
||||
if (!messageRow) return [null, null];
|
||||
|
||||
const [errorDetails, errorType] = messageRow
|
||||
.split(':')
|
||||
.reverse()
|
||||
.map((i) => i.trim());
|
||||
|
||||
return [errorDetails, errorType === 'Error' ? null : errorType];
|
||||
}
|
||||
}
|
||||
5
packages/@n8n/task-runner/src/obj-utils.ts
Normal file
5
packages/@n8n/task-runner/src/obj-utils.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
export function isObject(maybe: unknown): maybe is { [key: string]: unknown } {
|
||||
return (
|
||||
typeof maybe === 'object' && maybe !== null && !Array.isArray(maybe) && !(maybe instanceof Date)
|
||||
);
|
||||
}
|
||||
116
packages/@n8n/task-runner/src/result-validation.ts
Normal file
116
packages/@n8n/task-runner/src/result-validation.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { normalizeItems } from 'n8n-core';
|
||||
import type { INodeExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { isObject } from '@/obj-utils';
|
||||
import { ValidationError } from '@/validation-error';
|
||||
|
||||
export const REQUIRED_N8N_ITEM_KEYS = new Set(['json', 'binary', 'pairedItem', 'error']);
|
||||
|
||||
function validateTopLevelKeys(item: INodeExecutionData, itemIndex: number) {
|
||||
for (const key in item) {
|
||||
if (Object.prototype.hasOwnProperty.call(item, key)) {
|
||||
if (REQUIRED_N8N_ITEM_KEYS.has(key)) return;
|
||||
|
||||
throw new ValidationError({
|
||||
message: `Unknown top-level item key: ${key}`,
|
||||
description: 'Access the properties of an item under `.json`, e.g. `item.json`',
|
||||
itemIndex,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function validateItem({ json, binary }: INodeExecutionData, itemIndex: number) {
|
||||
if (json === undefined || !isObject(json)) {
|
||||
throw new ValidationError({
|
||||
message: "A 'json' property isn't an object",
|
||||
description: "In the returned data, every key named 'json' must point to an object.",
|
||||
itemIndex,
|
||||
});
|
||||
}
|
||||
|
||||
if (binary !== undefined && !isObject(binary)) {
|
||||
throw new ValidationError({
|
||||
message: "A 'binary' property isn't an object",
|
||||
description: "In the returned data, every key named 'binary' must point to an object.",
|
||||
itemIndex,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the output of a code node in 'Run for All Items' mode.
|
||||
*/
|
||||
export function validateRunForAllItemsOutput(
|
||||
executionResult: INodeExecutionData | INodeExecutionData[] | undefined,
|
||||
) {
|
||||
if (typeof executionResult !== 'object') {
|
||||
throw new ValidationError({
|
||||
message: "Code doesn't return items properly",
|
||||
description: 'Please return an array of objects, one for each item you would like to output.',
|
||||
});
|
||||
}
|
||||
|
||||
if (Array.isArray(executionResult)) {
|
||||
/**
|
||||
* If at least one top-level key is an n8n item key (`json`, `binary`, etc.),
|
||||
* then require all item keys to be an n8n item key.
|
||||
*
|
||||
* If no top-level key is an n8n key, then skip this check, allowing non-n8n
|
||||
* item keys to be wrapped in `json` when normalizing items below.
|
||||
*/
|
||||
const mustHaveTopLevelN8nKey = executionResult.some((item) =>
|
||||
Object.keys(item).find((key) => REQUIRED_N8N_ITEM_KEYS.has(key)),
|
||||
);
|
||||
|
||||
if (mustHaveTopLevelN8nKey) {
|
||||
for (let index = 0; index < executionResult.length; index++) {
|
||||
const item = executionResult[index];
|
||||
validateTopLevelKeys(item, index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const returnData = normalizeItems(executionResult);
|
||||
returnData.forEach(validateItem);
|
||||
return returnData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the output of a code node in 'Run for Each Item' mode for single item
|
||||
*/
|
||||
export function validateRunForEachItemOutput(
|
||||
executionResult: INodeExecutionData | undefined,
|
||||
itemIndex: number,
|
||||
) {
|
||||
if (typeof executionResult !== 'object') {
|
||||
throw new ValidationError({
|
||||
message: "Code doesn't return an object",
|
||||
description: `Please return an object representing the output item. ('${executionResult}' was returned instead.)`,
|
||||
itemIndex,
|
||||
});
|
||||
}
|
||||
|
||||
if (Array.isArray(executionResult)) {
|
||||
const firstSentence =
|
||||
executionResult.length > 0
|
||||
? `An array of ${typeof executionResult[0]}s was returned.`
|
||||
: 'An empty array was returned.';
|
||||
throw new ValidationError({
|
||||
message: "Code doesn't return a single object",
|
||||
description: `${firstSentence} If you need to output multiple items, please use the 'Run Once for All Items' mode instead.`,
|
||||
itemIndex,
|
||||
});
|
||||
}
|
||||
|
||||
const [returnData] = normalizeItems([executionResult]);
|
||||
|
||||
validateItem(returnData, itemIndex);
|
||||
|
||||
// If at least one top-level key is a supported item key (`json`, `binary`, etc.),
|
||||
// and another top-level key is unrecognized, then the user mis-added a property
|
||||
// directly on the item, when they intended to add it on the `json` property
|
||||
validateTopLevelKeys(returnData, itemIndex);
|
||||
|
||||
return returnData;
|
||||
}
|
||||
@@ -257,11 +257,8 @@ export abstract class TaskRunner {
|
||||
const data = await this.executeTask(task);
|
||||
this.taskDone(taskId, data);
|
||||
} catch (e) {
|
||||
if (ensureError(e)) {
|
||||
this.taskErrored(taskId, (e as Error).message);
|
||||
} else {
|
||||
this.taskErrored(taskId, e);
|
||||
}
|
||||
const error = ensureError(e);
|
||||
this.taskErrored(taskId, error);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
44
packages/@n8n/task-runner/src/validation-error.ts
Normal file
44
packages/@n8n/task-runner/src/validation-error.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class ValidationError extends ApplicationError {
|
||||
description = '';
|
||||
|
||||
itemIndex: number | undefined = undefined;
|
||||
|
||||
context: { itemIndex: number } | undefined = undefined;
|
||||
|
||||
lineNumber: number | undefined = undefined;
|
||||
|
||||
constructor({
|
||||
message,
|
||||
description,
|
||||
itemIndex,
|
||||
lineNumber,
|
||||
}: {
|
||||
message: string;
|
||||
description: string;
|
||||
itemIndex?: number;
|
||||
lineNumber?: number;
|
||||
}) {
|
||||
super(message);
|
||||
|
||||
this.lineNumber = lineNumber;
|
||||
this.itemIndex = itemIndex;
|
||||
|
||||
if (this.lineNumber !== undefined && this.itemIndex !== undefined) {
|
||||
this.message = `${message} [line ${lineNumber}, for item ${itemIndex}]`;
|
||||
} else if (this.lineNumber !== undefined) {
|
||||
this.message = `${message} [line ${lineNumber}]`;
|
||||
} else if (this.itemIndex !== undefined) {
|
||||
this.message = `${message} [item ${itemIndex}]`;
|
||||
} else {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
this.description = description;
|
||||
|
||||
if (this.itemIndex !== undefined) {
|
||||
this.context = { itemIndex: this.itemIndex };
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user