feat(core): Add support for building LLM applications (#7235)

This extracts all core and editor changes from #7246 and #7137, so that
we can get these changes merged first.

ADO-1120

[DB Tests](https://github.com/n8n-io/n8n/actions/runs/6379749011)
[E2E Tests](https://github.com/n8n-io/n8n/actions/runs/6379751480)
[Workflow Tests](https://github.com/n8n-io/n8n/actions/runs/6379752828)

---------

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
Co-authored-by: Alex Grozav <alex@grozav.com>
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-10-02 17:33:43 +02:00
committed by GitHub
parent 04dfcd73be
commit 00a4b8b0c6
93 changed files with 6209 additions and 728 deletions

View File

@@ -39,6 +39,8 @@ import pick from 'lodash/pick';
import { extension, lookup } from 'mime-types';
import type {
BinaryHelperFunctions,
ConnectionTypes,
ExecutionError,
FieldType,
FileSystemHelperFunctions,
FunctionsBase,
@@ -66,6 +68,8 @@ import type {
INodeCredentialDescription,
INodeCredentialsDetails,
INodeExecutionData,
INodeInputConfiguration,
INodeOutputConfiguration,
INodeProperties,
INodePropertyCollection,
INodePropertyOptions,
@@ -75,6 +79,7 @@ import type {
IPollFunctions,
IRunExecutionData,
ISourceData,
ITaskData,
ITaskDataConnections,
ITriggerFunctions,
IWebhookData,
@@ -106,6 +111,7 @@ import {
isObjectEmpty,
isResourceMapperValue,
validateFieldType,
ExecutionBaseError,
} from 'n8n-workflow';
import type { Token } from 'oauth-1.0a';
import clientOAuth1 from 'oauth-1.0a';
@@ -2253,6 +2259,9 @@ export function getNodeParameter(
timezone,
additionalKeys,
executeData,
false,
{},
options?.contextNode?.name,
);
cleanupParameterData(returnData);
} catch (e) {
@@ -2380,6 +2389,106 @@ export function getWebhookDescription(
return undefined;
}
// TODO: Change options to an object
const addExecutionDataFunctions = async (
type: 'input' | 'output',
nodeName: string,
data: INodeExecutionData[][] | ExecutionBaseError,
runExecutionData: IRunExecutionData,
connectionType: ConnectionTypes,
additionalData: IWorkflowExecuteAdditionalData,
sourceNodeName: string,
sourceNodeRunIndex: number,
currentNodeRunIndex: number,
): Promise<void> => {
if (connectionType === 'main') {
throw new Error(`Setting the ${type} is not supported for the main connection!`);
}
let taskData: ITaskData | undefined;
if (type === 'input') {
taskData = {
startTime: new Date().getTime(),
executionTime: 0,
executionStatus: 'running',
source: [null],
};
} else {
// At the moment we expect that there is always an input sent before the output
taskData = get(
runExecutionData,
['resultData', 'runData', nodeName, currentNodeRunIndex],
undefined,
);
if (taskData === undefined) {
return;
}
}
taskData = taskData!;
if (data instanceof Error) {
// TODO: Or "failed", what is the difference
taskData.executionStatus = 'error';
taskData.error = data;
} else {
if (type === 'output') {
taskData.executionStatus = 'success';
}
taskData.data = {
[connectionType]: data,
} as ITaskDataConnections;
}
if (type === 'input') {
if (!(data instanceof Error)) {
taskData.inputOverride = {
[connectionType]: data,
} as ITaskDataConnections;
}
if (!runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
runExecutionData.resultData.runData[nodeName] = [];
}
runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteBefore', {
executionId: additionalData.executionId,
nodeName,
});
}
} else {
// Outputs
taskData.executionTime = new Date().getTime() - taskData.startTime;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteAfter', {
executionId: additionalData.executionId,
nodeName,
data: taskData,
});
}
let sourceTaskData = get(runExecutionData, `executionData.metadata[${sourceNodeName}]`);
if (!sourceTaskData) {
runExecutionData.executionData!.metadata[sourceNodeName] = [];
sourceTaskData = runExecutionData.executionData!.metadata[sourceNodeName];
}
if (!sourceTaskData[sourceNodeRunIndex]) {
sourceTaskData[sourceNodeRunIndex] = {
subRun: [],
};
}
sourceTaskData[sourceNodeRunIndex]!.subRun!.push({
node: nodeName,
runIndex: currentNodeRunIndex,
});
}
};
const getCommonWorkflowFunctions = (
workflow: Workflow,
node: INode,
@@ -2787,6 +2896,192 @@ export function getExecuteFunctions(
getContext(type: string): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
async getInputConnectionData(
inputName: ConnectionTypes,
itemIndex: number,
// TODO: Not implemented yet, and maybe also not needed
inputIndex?: number,
): Promise<unknown> {
const node = this.getNode();
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const inputs = NodeHelpers.getNodeInputs(workflow, node, nodeType.description);
let inputConfiguration = inputs.find((input) => {
if (typeof input === 'string') {
return input === inputName;
}
return input.type === inputName;
});
if (inputConfiguration === undefined) {
throw new Error(`The node "${node.name}" does not have an input of type "${inputName}"`);
}
if (typeof inputConfiguration === 'string') {
inputConfiguration = {
type: inputConfiguration,
} as INodeInputConfiguration;
}
const parentNodes = workflow.getParentNodes(node.name, inputName, 1);
if (parentNodes.length === 0) {
return inputConfiguration.maxConnections === 1 ? undefined : [];
}
const constParentNodes = parentNodes
.map((nodeName) => {
return workflow.getNode(nodeName) as INode;
})
.filter((connectedNode) => connectedNode.disabled !== true)
.map(async (connectedNode) => {
const nodeType = workflow.nodeTypes.getByNameAndVersion(
connectedNode.type,
connectedNode.typeVersion,
);
if (!nodeType.supplyData) {
throw new Error(
`The node "${connectedNode.name}" does not have a "supplyData" method defined!`,
);
}
const context = Object.assign({}, this);
context.getNodeParameter = (
parameterName: string,
itemIndex: number,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
) => {
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
connectedNode,
parameterName,
itemIndex,
mode,
additionalData.timezone,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
fallbackValue,
{ ...(options || {}), contextNode: node },
) as any;
};
// TODO: Check what else should be overwritten
context.getNode = () => {
return deepCopy(connectedNode);
};
context.getCredentials = async (key: string) => {
try {
return await getCredentials(
workflow,
connectedNode,
key,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
itemIndex,
);
} catch (error) {
// Display the error on the node which is causing it
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
throw error;
}
};
try {
return await nodeType.supplyData.call(context);
} catch (error) {
if (!(error instanceof ExecutionBaseError)) {
error = new NodeOperationError(connectedNode, error, {
itemIndex,
});
}
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
// Display the error on the node which is causing it
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
// Display on the calling node which node has the error
throw new NodeOperationError(
connectedNode,
`Error on node "${connectedNode.name}" which is connected via input "${inputName}"`,
{
itemIndex,
},
);
}
});
// Validate the inputs
const nodes = await Promise.all(constParentNodes);
if (inputConfiguration.required && nodes.length === 0) {
throw new NodeOperationError(node, `A ${inputName} processor node must be connected!`);
}
if (
inputConfiguration.maxConnections !== undefined &&
nodes.length > inputConfiguration.maxConnections
) {
throw new NodeOperationError(
node,
`Only ${inputConfiguration.maxConnections} ${inputName} processor nodes are/is allowed to be connected!`,
);
}
return inputConfiguration.maxConnections === 1
? (nodes || [])[0]?.response
: nodes.map((node) => node.response);
},
getNodeOutputs(): INodeOutputConfiguration[] {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => {
if (typeof output === 'string') {
return {
type: output,
};
}
return output;
});
},
getInputData: (inputIndex = 0, inputName = 'main') => {
if (!inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
@@ -2863,7 +3158,7 @@ export function getExecuteFunctions(
return;
}
try {
if (additionalData.sendMessageToUI) {
if (additionalData.sendDataToUI) {
args = args.map((arg) => {
// prevent invalid dates from being logged as null
if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg };
@@ -2875,7 +3170,10 @@ export function getExecuteFunctions(
return arg;
});
additionalData.sendMessageToUI(node.name, args);
additionalData.sendDataToUI('sendConsoleMessage', {
source: `[Node: "${node.name}"]`,
messages: args,
});
}
} catch (error) {
Logger.warn(`There was a problem sending message to UI: ${error.message}`);
@@ -2884,6 +3182,60 @@ export function getExecuteFunctions(
async sendResponse(response: IExecuteResponsePromiseData): Promise<void> {
await additionalData.hooks?.executeHookFunctions('sendResponse', [response]);
},
addInputData(
connectionType: ConnectionTypes,
data: INodeExecutionData[][] | ExecutionError,
): { index: number } {
const nodeName = this.getNode().name;
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length;
}
addExecutionDataFunctions(
'input',
this.getNode().name,
data,
runExecutionData,
connectionType,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
).catch((error) => {
Logger.warn(
`There was a problem logging input data of node "${this.getNode().name}": ${
error.message
}`,
);
});
return { index: currentNodeRunIndex };
},
addOutputData(
connectionType: ConnectionTypes,
currentNodeRunIndex: number,
data: INodeExecutionData[][] | ExecutionError,
): void {
addExecutionDataFunctions(
'output',
this.getNode().name,
data,
runExecutionData,
connectionType,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
).catch((error) => {
Logger.warn(
`There was a problem logging output data of node "${this.getNode().name}": ${
error.message
}`,
);
});
},
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),

View File

@@ -23,6 +23,7 @@ import type {
ITaskData,
ITaskDataConnections,
ITaskDataConnectionsSource,
ITaskMetadata,
IWaitingForExecution,
IWaitingForExecutionSource,
NodeApiError,
@@ -65,6 +66,7 @@ export class WorkflowExecute {
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
@@ -133,6 +135,7 @@ export class WorkflowExecute {
executionData: {
contextData: {},
nodeExecutionStack,
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
@@ -160,7 +163,7 @@ export class WorkflowExecute {
workflow: Workflow,
runData: IRunData,
startNodes: string[],
destinationNode: string,
destinationNode?: string,
pinData?: IPinData,
): PCancelable<IRun> {
let incomingNodeConnections: INodeConnections | undefined;
@@ -169,6 +172,7 @@ export class WorkflowExecute {
this.status = 'running';
const runIndex = 0;
let runNodeFilter: string[] | undefined;
// Initialize the nodeExecutionStack and waitingExecution with
// the data from runData
@@ -182,7 +186,6 @@ export class WorkflowExecute {
let incomingSourceData: ITaskDataConnectionsSource | null = null;
if (incomingNodeConnections === undefined) {
// If it has no incoming data add the default empty data
incomingData.push([
{
json: {},
@@ -202,6 +205,9 @@ export class WorkflowExecute {
if (node && pinData && pinData[node.name]) {
incomingData.push(pinData[node.name]);
} else {
if (!runData[connection.node]) {
continue;
}
const nodeIncomingData =
runData[connection.node][runIndex]?.data?.[connection.type][connection.index];
if (nodeIncomingData) {
@@ -226,56 +232,57 @@ export class WorkflowExecute {
nodeExecutionStack.push(executeData);
// Check if the destinationNode has to be added as waiting
// because some input data is already fully available
incomingNodeConnections = workflow.connectionsByDestinationNode[destinationNode];
if (incomingNodeConnections !== undefined) {
for (const connections of incomingNodeConnections.main) {
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
connection = connections[inputIndex];
if (destinationNode) {
// Check if the destinationNode has to be added as waiting
// because some input data is already fully available
incomingNodeConnections = workflow.connectionsByDestinationNode[destinationNode];
if (incomingNodeConnections !== undefined) {
for (const connections of incomingNodeConnections.main) {
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
connection = connections[inputIndex];
if (waitingExecution[destinationNode] === undefined) {
waitingExecution[destinationNode] = {};
waitingExecutionSource[destinationNode] = {};
}
if (waitingExecution[destinationNode][runIndex] === undefined) {
waitingExecution[destinationNode][runIndex] = {};
waitingExecutionSource[destinationNode][runIndex] = {};
}
if (waitingExecution[destinationNode][runIndex][connection.type] === undefined) {
waitingExecution[destinationNode][runIndex][connection.type] = [];
waitingExecutionSource[destinationNode][runIndex][connection.type] = [];
}
if (waitingExecution[destinationNode] === undefined) {
waitingExecution[destinationNode] = {};
waitingExecutionSource[destinationNode] = {};
}
if (waitingExecution[destinationNode][runIndex] === undefined) {
waitingExecution[destinationNode][runIndex] = {};
waitingExecutionSource[destinationNode][runIndex] = {};
}
if (waitingExecution[destinationNode][runIndex][connection.type] === undefined) {
waitingExecution[destinationNode][runIndex][connection.type] = [];
waitingExecutionSource[destinationNode][runIndex][connection.type] = [];
}
if (runData[connection.node] !== undefined) {
// Input data exists so add as waiting
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
waitingExecution[destinationNode][runIndex][connection.type].push(
runData[connection.node][runIndex].data![connection.type][connection.index],
);
waitingExecutionSource[destinationNode][runIndex][connection.type].push({
previousNode: connection.node,
previousNodeOutput: connection.index || undefined,
previousNodeRun: runIndex || undefined,
} as ISourceData);
} else {
waitingExecution[destinationNode][runIndex][connection.type].push(null);
waitingExecutionSource[destinationNode][runIndex][connection.type].push(null);
if (runData[connection.node] !== undefined) {
// Input data exists so add as waiting
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
waitingExecution[destinationNode][runIndex][connection.type].push(
runData[connection.node][runIndex].data![connection.type][connection.index],
);
waitingExecutionSource[destinationNode][runIndex][connection.type].push({
previousNode: connection.node,
previousNodeOutput: connection.index || undefined,
previousNodeRun: runIndex || undefined,
} as ISourceData);
} else {
waitingExecution[destinationNode][runIndex][connection.type].push(null);
waitingExecutionSource[destinationNode][runIndex][connection.type].push(null);
}
}
}
}
// Only run the parent nodes and no others
// eslint-disable-next-line prefer-const
runNodeFilter = workflow
.getParentNodes(destinationNode)
.filter((parentNodeName) => !workflow.getNode(parentNodeName)?.disabled);
runNodeFilter.push(destinationNode);
}
}
// Only run the parent nodes and no others
let runNodeFilter: string[] | undefined;
// eslint-disable-next-line prefer-const
runNodeFilter = workflow
.getParentNodes(destinationNode)
.filter((parentNodeName) => !workflow.getNode(parentNodeName)?.disabled);
runNodeFilter.push(destinationNode);
this.runExecutionData = {
startData: {
destinationNode,
@@ -288,6 +295,7 @@ export class WorkflowExecute {
executionData: {
contextData: {},
nodeExecutionStack,
metadata: {},
waitingExecution,
waitingExecutionSource,
},
@@ -309,6 +317,22 @@ export class WorkflowExecute {
return this.additionalData.hooks.executeHookFunctions(hookName, parameters);
}
moveNodeMetadata(): void {
const metadata = get(this.runExecutionData, 'executionData.metadata');
if (metadata) {
const runData = get(this.runExecutionData, 'resultData.runData');
let index: number;
let metaRunData: ITaskMetadata;
for (const nodeName of Object.keys(metadata)) {
for ([index, metaRunData] of metadata[nodeName].entries()) {
runData[nodeName][index].metadata = metaRunData;
}
}
}
}
/**
* Checks the incoming connection does not receive any data
*/
@@ -1533,6 +1557,9 @@ export class WorkflowExecute {
// Static data of workflow changed
newStaticData = workflow.staticData;
}
this.moveNodeMetadata();
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(
// eslint-disable-next-line @typescript-eslint/no-shadow
(error) => {
@@ -1601,6 +1628,9 @@ export class WorkflowExecute {
// Static data of workflow changed
newStaticData = workflow.staticData;
}
this.moveNodeMetadata();
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
if (closeFunction) {