feat: (Execute Workflow Node): Inputs for Sub-workflows (#11830) (#11837)

Co-authored-by: Charlie Kolb <charlie@n8n.io>
Co-authored-by: Milorad FIlipović <milorad@n8n.io>
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Ivan Atanasov
2024-12-20 17:01:22 +01:00
committed by GitHub
parent 6c323e4e49
commit d4116630a6
52 changed files with 4023 additions and 688 deletions

View File

@@ -0,0 +1,113 @@
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, IWorkflowDataProxyData } from 'n8n-workflow';
import { ExecuteWorkflow } from './ExecuteWorkflow.node';
import { getWorkflowInfo } from './GenericFunctions';
jest.mock('./GenericFunctions');
jest.mock('../../../utils/utilities');
describe('ExecuteWorkflow', () => {
const executeWorkflow = new ExecuteWorkflow();
const executeFunctions = mock<IExecuteFunctions>({
getNodeParameter: jest.fn(),
getInputData: jest.fn(),
getWorkflowDataProxy: jest.fn(),
executeWorkflow: jest.fn(),
continueOnFail: jest.fn(),
setMetadata: jest.fn(),
getNode: jest.fn(),
});
beforeEach(() => {
jest.clearAllMocks();
executeFunctions.getInputData.mockReturnValue([{ json: { key: 'value' } }]);
executeFunctions.getWorkflowDataProxy.mockReturnValue({
$workflow: { id: 'workflowId' },
$execution: { id: 'executionId' },
} as unknown as IWorkflowDataProxyData);
});
test('should execute workflow in "each" mode and wait for sub-workflow completion', async () => {
executeFunctions.getNodeParameter
.mockReturnValueOnce('database') // source
.mockReturnValueOnce('each') // mode
.mockReturnValueOnce(true) // waitForSubWorkflow
.mockReturnValueOnce([]); // workflowInputs.schema
executeFunctions.getInputData.mockReturnValue([{ json: { key: 'value' } }]);
executeFunctions.getWorkflowDataProxy.mockReturnValue({
$workflow: { id: 'workflowId' },
$execution: { id: 'executionId' },
} as unknown as IWorkflowDataProxyData);
(getWorkflowInfo as jest.Mock).mockResolvedValue({ id: 'subWorkflowId' });
(executeFunctions.executeWorkflow as jest.Mock).mockResolvedValue({
executionId: 'subExecutionId',
data: [[{ json: { key: 'subValue' } }]],
});
const result = await executeWorkflow.execute.call(executeFunctions);
expect(result).toEqual([
[
{
json: { key: 'value' },
index: 0,
pairedItem: { item: 0 },
metadata: {
subExecution: { workflowId: 'subWorkflowId', executionId: 'subExecutionId' },
},
},
],
]);
});
test('should execute workflow in "once" mode and not wait for sub-workflow completion', async () => {
executeFunctions.getNodeParameter
.mockReturnValueOnce('database') // source
.mockReturnValueOnce('once') // mode
.mockReturnValueOnce(false) // waitForSubWorkflow
.mockReturnValueOnce([]); // workflowInputs.schema
executeFunctions.getInputData.mockReturnValue([{ json: { key: 'value' } }]);
executeFunctions.executeWorkflow.mockResolvedValue({
executionId: 'subExecutionId',
data: [[{ json: { key: 'subValue' } }]],
});
const result = await executeWorkflow.execute.call(executeFunctions);
expect(result).toEqual([[{ json: { key: 'value' }, index: 0, pairedItem: { item: 0 } }]]);
});
test('should handle errors and continue on fail', async () => {
executeFunctions.getNodeParameter
.mockReturnValueOnce('database') // source
.mockReturnValueOnce('each') // mode
.mockReturnValueOnce(true) // waitForSubWorkflow
.mockReturnValueOnce([]); // workflowInputs.schema
(getWorkflowInfo as jest.Mock).mockRejectedValue(new Error('Test error'));
(executeFunctions.continueOnFail as jest.Mock).mockReturnValue(true);
const result = await executeWorkflow.execute.call(executeFunctions);
expect(result).toEqual([[{ json: { error: 'Test error' }, pairedItem: { item: 0 } }]]);
});
test('should throw error if not continuing on fail', async () => {
executeFunctions.getNodeParameter
.mockReturnValueOnce('database') // source
.mockReturnValueOnce('each') // mode
.mockReturnValueOnce(true) // waitForSubWorkflow
.mockReturnValueOnce([]); // workflowInputs.schema
(getWorkflowInfo as jest.Mock).mockRejectedValue(new Error('Test error'));
(executeFunctions.continueOnFail as jest.Mock).mockReturnValue(false);
await expect(executeWorkflow.execute.call(executeFunctions)).rejects.toThrow(
'Error executing workflow with item at index 0',
);
});
});

View File

@@ -8,8 +8,11 @@ import type {
} from 'n8n-workflow';
import { getWorkflowInfo } from './GenericFunctions';
import { generatePairedItemData } from '../../utils/utilities';
import { generatePairedItemData } from '../../../utils/utilities';
import {
getCurrentWorkflowInputData,
loadWorkflowInputMappings,
} from '../../../utils/workflowInputsResourceMapping/GenericFunctions';
export class ExecuteWorkflow implements INodeType {
description: INodeTypeDescription = {
displayName: 'Execute Workflow',
@@ -17,7 +20,7 @@ export class ExecuteWorkflow implements INodeType {
icon: 'fa:sign-in-alt',
iconColor: 'orange-red',
group: ['transform'],
version: [1, 1.1],
version: [1, 1.1, 1.2],
subtitle: '={{"Workflow: " + $parameter["workflowId"]}}',
description: 'Execute another workflow',
defaults: {
@@ -40,6 +43,13 @@ export class ExecuteWorkflow implements INodeType {
},
],
},
{
displayName: 'This node is out of date. Please upgrade by removing it and adding a new one',
name: 'outdatedVersionWarning',
type: 'notice',
displayOptions: { show: { '@version': [{ _cnd: { lte: 1.1 } }] } },
default: '',
},
{
displayName: 'Source',
name: 'source',
@@ -68,6 +78,27 @@ export class ExecuteWorkflow implements INodeType {
],
default: 'database',
description: 'Where to get the workflow to execute from',
displayOptions: { show: { '@version': [{ _cnd: { lte: 1.1 } }] } },
},
{
displayName: 'Source',
name: 'source',
type: 'options',
options: [
{
name: 'Database',
value: 'database',
description: 'Load the workflow from the database by ID',
},
{
name: 'Define Below',
value: 'parameter',
description: 'Pass the JSON code of a workflow',
},
],
default: 'database',
description: 'Where to get the workflow to execute from',
displayOptions: { show: { '@version': [{ _cnd: { gte: 1.2 } }] } },
},
// ----------------------------------
@@ -164,6 +195,43 @@ export class ExecuteWorkflow implements INodeType {
name: 'executeWorkflowNotice',
type: 'notice',
default: '',
displayOptions: { show: { '@version': [{ _cnd: { lte: 1.1 } }] } },
},
{
displayName: 'Workflow Inputs',
name: 'workflowInputs',
type: 'resourceMapper',
noDataExpression: true,
default: {
mappingMode: 'defineBelow',
value: null,
},
required: true,
typeOptions: {
loadOptionsDependsOn: ['workflowId.value'],
resourceMapper: {
localResourceMapperMethod: 'loadWorkflowInputMappings',
valuesLabel: 'Workflow Inputs',
mode: 'map',
fieldWords: {
singular: 'input',
plural: 'inputs',
},
addAllFields: true,
multiKeyMatch: false,
supportAutoMap: false,
showTypeConversionOptions: true,
},
},
displayOptions: {
show: {
source: ['database'],
'@version': [{ _cnd: { gte: 1.2 } }],
},
hide: {
workflowId: [''],
},
},
},
{
displayName: 'Mode',
@@ -206,10 +274,16 @@ export class ExecuteWorkflow implements INodeType {
],
};
methods = {
localResourceMapping: {
loadWorkflowInputMappings,
},
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const source = this.getNodeParameter('source', 0) as string;
const mode = this.getNodeParameter('mode', 0, false) as string;
const items = this.getInputData();
const items = getCurrentWorkflowInputData.call(this);
const workflowProxy = this.getWorkflowDataProxy(0);
const currentWorkflowId = workflowProxy.$workflow.id as string;

View File

@@ -3,11 +3,16 @@ import { NodeOperationError, jsonParse } from 'n8n-workflow';
import type {
IExecuteFunctions,
IExecuteWorkflowInfo,
ILoadOptionsFunctions,
INodeParameterResourceLocator,
IRequestOptions,
} from 'n8n-workflow';
export async function getWorkflowInfo(this: IExecuteFunctions, source: string, itemIndex = 0) {
export async function getWorkflowInfo(
this: ILoadOptionsFunctions | IExecuteFunctions,
source: string,
itemIndex = 0,
) {
const workflowInfo: IExecuteWorkflowInfo = {};
const nodeVersion = this.getNode().typeVersion;
if (source === 'database') {

View File

@@ -0,0 +1,17 @@
{
"node": "n8n-nodes-base.executeWorkflowTrigger",
"nodeVersion": "1.0",
"codexVersion": "1.0",
"categories": ["Core Nodes"],
"resources": {
"primaryDocumentation": [
{
"url": "https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.executeworkflowtrigger/"
}
],
"generic": []
},
"subcategories": {
"Core Nodes": ["Helpers"]
}
}

View File

@@ -0,0 +1,53 @@
import { mock } from 'jest-mock-extended';
import type { FieldValueOption, IExecuteFunctions, INode, INodeExecutionData } from 'n8n-workflow';
import { ExecuteWorkflowTrigger } from './ExecuteWorkflowTrigger.node';
import { WORKFLOW_INPUTS } from '../../../utils/workflowInputsResourceMapping/constants';
import { getFieldEntries } from '../../../utils/workflowInputsResourceMapping/GenericFunctions';
jest.mock('../../../utils/workflowInputsResourceMapping/GenericFunctions', () => ({
getFieldEntries: jest.fn(),
getWorkflowInputData: jest.fn(),
}));
describe('ExecuteWorkflowTrigger', () => {
const mockInputData: INodeExecutionData[] = [
{ json: { item: 0, foo: 'bar' }, index: 0 },
{ json: { item: 1, foo: 'quz' }, index: 1 },
];
const mockNode = mock<INode>({ typeVersion: 1 });
const executeFns = mock<IExecuteFunctions>({
getInputData: () => mockInputData,
getNode: () => mockNode,
getNodeParameter: jest.fn(),
});
it('should return its input data on V1 or V1.1 passthrough', async () => {
// User selection in V1.1, or fallback return value in V1 with dropdown not displayed
executeFns.getNodeParameter.mockReturnValueOnce('passthrough');
const result = await new ExecuteWorkflowTrigger().execute.call(executeFns);
expect(result).toEqual([mockInputData]);
});
it('should filter out parent input in `Using Fields below` mode', async () => {
executeFns.getNodeParameter.mockReturnValueOnce(WORKFLOW_INPUTS);
const mockNewParams = [
{ name: 'value1', type: 'string' },
{ name: 'value2', type: 'number' },
{ name: 'foo', type: 'string' },
] as FieldValueOption[];
const getFieldEntriesMock = (getFieldEntries as jest.Mock).mockReturnValue(mockNewParams);
const result = await new ExecuteWorkflowTrigger().execute.call(executeFns);
const expected = [
[
{ index: 0, json: { value1: null, value2: null, foo: mockInputData[0].json.foo } },
{ index: 1, json: { value1: null, value2: null, foo: mockInputData[1].json.foo } },
],
];
expect(result).toEqual(expected);
expect(getFieldEntriesMock).toHaveBeenCalledWith(executeFns);
});
});

View File

@@ -0,0 +1,225 @@
import _ from 'lodash';
import {
type INodeExecutionData,
NodeConnectionType,
type IExecuteFunctions,
type INodeType,
type INodeTypeDescription,
} from 'n8n-workflow';
import {
INPUT_SOURCE,
WORKFLOW_INPUTS,
JSON_EXAMPLE,
VALUES,
TYPE_OPTIONS,
PASSTHROUGH,
FALLBACK_DEFAULT_VALUE,
} from '../../../utils/workflowInputsResourceMapping/constants';
import { getFieldEntries } from '../../../utils/workflowInputsResourceMapping/GenericFunctions';
export class ExecuteWorkflowTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'Execute Workflow Trigger',
name: 'executeWorkflowTrigger',
icon: 'fa:sign-out-alt',
group: ['trigger'],
version: [1, 1.1],
description:
'Helpers for calling other n8n workflows. Used for designing modular, microservice-like workflows.',
eventTriggerDescription: '',
maxNodes: 1,
defaults: {
name: 'Workflow Input Trigger',
color: '#ff6d5a',
},
inputs: [],
outputs: [NodeConnectionType.Main],
hints: [
{
message: 'Please make sure to define your input fields.',
// This condition checks if we have no input fields, which gets a bit awkward:
// For WORKFLOW_INPUTS: keys() only contains `VALUES` if at least one value is provided
// For JSON_EXAMPLE: We remove all whitespace and check if we're left with an empty object. Note that we already error if the example is not valid JSON
displayCondition:
`={{$parameter['${INPUT_SOURCE}'] === '${WORKFLOW_INPUTS}' && !$parameter['${WORKFLOW_INPUTS}'].keys().length ` +
`|| $parameter['${INPUT_SOURCE}'] === '${JSON_EXAMPLE}' && $parameter['${JSON_EXAMPLE}'].toString().replaceAll(' ', '').replaceAll('\\n', '') === '{}' }}`,
whenToDisplay: 'always',
location: 'ndv',
},
],
properties: [
{
displayName: 'Events',
name: 'events',
type: 'hidden',
noDataExpression: true,
options: [
{
name: 'Workflow Call',
value: 'worklfow_call',
description: 'When called by another workflow using Execute Workflow Trigger',
action: 'When Called by Another Workflow',
},
],
default: 'worklfow_call',
},
{
displayName:
"When an execute workflow node calls this workflow, the execution starts here. Any data passed into the 'execute workflow' node will be output by this node.",
name: 'notice',
type: 'notice',
default: '',
displayOptions: {
show: { '@version': [{ _cnd: { eq: 1 } }] },
},
},
{
displayName: 'This node is out of date. Please upgrade by removing it and adding a new one',
name: 'outdatedVersionWarning',
type: 'notice',
displayOptions: { show: { '@version': [{ _cnd: { eq: 1 } }] } },
default: '',
},
{
// eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased
displayName: 'Input data mode',
name: INPUT_SOURCE,
type: 'options',
options: [
{
// eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased
name: 'Define using fields below',
value: WORKFLOW_INPUTS,
description: 'Provide input fields via UI',
},
{
// eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased
name: 'Define using JSON example',
value: JSON_EXAMPLE,
description: 'Generate a schema from an example JSON object',
},
{
// eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased
name: 'Accept all data',
value: PASSTHROUGH,
description: 'Use all incoming data from the parent workflow',
},
],
default: WORKFLOW_INPUTS,
noDataExpression: true,
displayOptions: {
show: { '@version': [{ _cnd: { gte: 1.1 } }] },
},
},
{
displayName:
'Provide an example object to infer fields and their types.<br>To allow any type for a given field, set the value to null.',
name: `${JSON_EXAMPLE}_notice`,
type: 'notice',
default: '',
displayOptions: {
show: { '@version': [{ _cnd: { gte: 1.1 } }], inputSource: [JSON_EXAMPLE] },
},
},
{
displayName: 'JSON Example',
name: JSON_EXAMPLE,
type: 'json',
default: JSON.stringify(
{
aField: 'a string',
aNumber: 123,
thisFieldAcceptsAnyType: null,
anArray: [],
},
null,
2,
),
noDataExpression: true,
displayOptions: {
show: { '@version': [{ _cnd: { gte: 1.1 } }], inputSource: [JSON_EXAMPLE] },
},
},
{
displayName: 'Workflow Inputs',
name: WORKFLOW_INPUTS,
placeholder: 'Add field',
type: 'fixedCollection',
description:
'Define expected input fields. If no inputs are provided, all data from the calling workflow will be passed through.',
typeOptions: {
multipleValues: true,
sortable: true,
minRequiredFields: 1,
},
displayOptions: {
show: { '@version': [{ _cnd: { gte: 1.1 } }], inputSource: [WORKFLOW_INPUTS] },
},
default: {},
options: [
{
name: VALUES,
displayName: 'Values',
values: [
{
displayName: 'Name',
name: 'name',
type: 'string',
default: '',
placeholder: 'e.g. fieldName',
description: 'Name of the field',
required: true,
noDataExpression: true,
},
{
displayName: 'Type',
name: 'type',
type: 'options',
description: 'The field value type',
options: TYPE_OPTIONS,
required: true,
default: 'string',
noDataExpression: true,
},
],
},
],
},
],
};
async execute(this: IExecuteFunctions) {
const inputData = this.getInputData();
const inputSource = this.getNodeParameter(INPUT_SOURCE, 0, PASSTHROUGH) as string;
// Note on the data we receive from ExecuteWorkflow caller:
//
// The ExecuteWorkflow node typechecks all fields explicitly provided by the user here via the resourceMapper
// and removes all fields that are in the schema, but `removed` in the resourceMapper.
//
// In passthrough and legacy node versions, inputData will line up since the resourceMapper is empty,
// in which case all input is passed through.
// In other cases we will already have matching types and fields provided by the resource mapper,
// so we just need to be permissive on this end,
// while ensuring we provide default values for fields in our schema, which are removed in the resourceMapper.
if (inputSource === PASSTHROUGH) {
return [inputData];
} else {
const newParams = getFieldEntries(this);
const newKeys = new Set(newParams.map((x) => x.name));
const itemsInSchema: INodeExecutionData[] = inputData.map((row, index) => ({
json: {
...Object.fromEntries(newParams.map((x) => [x.name, FALLBACK_DEFAULT_VALUE])),
// Need to trim to the expected schema to support legacy Execute Workflow callers passing through all their data
// which we do not want to expose past this node.
..._.pickBy(row.json, (_value, key) => newKeys.has(key)),
},
index,
}));
return [itemsInSchema];
}
}
}