feat: Add Chat Trigger node (#7409)

Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
Co-authored-by: Jesper Bylund <mail@jesperbylund.com>
Co-authored-by: OlegIvaniv <me@olegivaniv.com>
Co-authored-by: Deborah <deborah@starfallprojects.co.uk>
Co-authored-by: Jan Oberhauser <janober@users.noreply.github.com>
Co-authored-by: Jon <jonathan.bennetts@gmail.com>
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
Co-authored-by: Michael Kret <88898367+michael-radency@users.noreply.github.com>
Co-authored-by: Giulio Andreini <andreini@netseven.it>
Co-authored-by: Mason Geloso <Mason.geloso@gmail.com>
Co-authored-by: Mason Geloso <hone@Masons-Mac-mini.local>
Co-authored-by: Mutasem Aldmour <mutasem@n8n.io>
This commit is contained in:
Alex Grozav
2024-01-09 13:11:39 +02:00
committed by GitHub
parent 1387541e33
commit af49e95cc7
90 changed files with 2671 additions and 668 deletions

View File

@@ -2514,6 +2514,197 @@ const addExecutionDataFunctions = async (
}
};
async function getInputConnectionData(
this: IAllExecuteFunctions,
workflow: Workflow,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData | undefined,
mode: WorkflowExecuteMode,
closeFunctions: CloseFunction[],
inputName: ConnectionTypes,
itemIndex: number,
// TODO: Not implemented yet, and maybe also not needed
inputIndex?: number,
): Promise<unknown> {
const node = this.getNode();
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const inputs = NodeHelpers.getNodeInputs(workflow, node, nodeType.description);
let inputConfiguration = inputs.find((input) => {
if (typeof input === 'string') {
return input === inputName;
}
return input.type === inputName;
});
if (inputConfiguration === undefined) {
throw new ApplicationError('Node does not have input of type', {
extra: { nodeName: node.name, inputName },
});
}
if (typeof inputConfiguration === 'string') {
inputConfiguration = {
type: inputConfiguration,
} as INodeInputConfiguration;
}
const parentNodes = workflow.getParentNodes(node.name, inputName, 1);
if (parentNodes.length === 0) {
return inputConfiguration.maxConnections === 1 ? undefined : [];
}
const constParentNodes = parentNodes
.map((nodeName) => {
return workflow.getNode(nodeName) as INode;
})
.filter((connectedNode) => connectedNode.disabled !== true)
.map(async (connectedNode) => {
const nodeType = workflow.nodeTypes.getByNameAndVersion(
connectedNode.type,
connectedNode.typeVersion,
);
if (!nodeType.supplyData) {
throw new ApplicationError('Node does not have a `supplyData` method defined', {
extra: { nodeName: connectedNode.name },
});
}
const context = Object.assign({}, this);
context.getNodeParameter = (
parameterName: string,
itemIndex: number,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
) => {
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
connectedNode,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
fallbackValue,
{ ...(options || {}), contextNode: node },
) as any;
};
// TODO: Check what else should be overwritten
context.getNode = () => {
return deepCopy(connectedNode);
};
context.getCredentials = async (key: string) => {
try {
return await getCredentials(
workflow,
connectedNode,
key,
additionalData,
mode,
executeData,
runExecutionData,
runIndex,
connectionInputData,
itemIndex,
);
} catch (error) {
// Display the error on the node which is causing it
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
throw error;
}
};
try {
const response = await nodeType.supplyData.call(context, itemIndex);
if (response.closeFunction) {
closeFunctions.push(response.closeFunction);
}
return response;
} catch (error) {
// Propagate errors from sub-nodes
if (error.functionality === 'configuration-node') throw error;
if (!(error instanceof ExecutionBaseError)) {
error = new NodeOperationError(connectedNode, error, {
itemIndex,
});
}
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
// Display the error on the node which is causing it
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
// Display on the calling node which node has the error
throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, {
itemIndex,
functionality: 'configuration-node',
description: error.message,
});
}
});
// Validate the inputs
const nodes = await Promise.all(constParentNodes);
if (inputConfiguration.required && nodes.length === 0) {
throw new NodeOperationError(node, `A ${inputName} processor node must be connected!`);
}
if (
inputConfiguration.maxConnections !== undefined &&
nodes.length > inputConfiguration.maxConnections
) {
throw new NodeOperationError(
node,
`Only ${inputConfiguration.maxConnections} ${inputName} processor nodes are/is allowed to be connected!`,
);
}
return inputConfiguration.maxConnections === 1
? (nodes || [])[0]?.response
: nodes.map((node) => node.response);
}
const getCommonWorkflowFunctions = (
workflow: Workflow,
node: INode,
@@ -3197,191 +3388,29 @@ export function getExecuteFunctions(
getContext(type: ContextType): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
async getInputConnectionData(
inputName: ConnectionTypes,
itemIndex: number,
// TODO: Not implemented yet, and maybe also not needed
inputIndex?: number,
): Promise<unknown> {
const node = this.getNode();
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const inputs = NodeHelpers.getNodeInputs(workflow, node, nodeType.description);
let inputConfiguration = inputs.find((input) => {
if (typeof input === 'string') {
return input === inputName;
}
return input.type === inputName;
});
if (inputConfiguration === undefined) {
throw new ApplicationError('Node does not have input of type', {
extra: { nodeName: node.name, inputName },
});
}
if (typeof inputConfiguration === 'string') {
inputConfiguration = {
type: inputConfiguration,
} as INodeInputConfiguration;
}
const parentNodes = workflow.getParentNodes(node.name, inputName, 1);
if (parentNodes.length === 0) {
return inputConfiguration.maxConnections === 1 ? undefined : [];
}
const constParentNodes = parentNodes
.map((nodeName) => {
return workflow.getNode(nodeName) as INode;
})
.filter((connectedNode) => connectedNode.disabled !== true)
.map(async (connectedNode) => {
const nodeType = workflow.nodeTypes.getByNameAndVersion(
connectedNode.type,
connectedNode.typeVersion,
);
if (!nodeType.supplyData) {
throw new ApplicationError('Node does not have a `supplyData` method defined', {
extra: { nodeName: connectedNode.name },
});
}
const context = Object.assign({}, this);
context.getNodeParameter = (
parameterName: string,
itemIndex: number,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
) => {
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
connectedNode,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
fallbackValue,
{ ...(options || {}), contextNode: node },
) as any;
};
// TODO: Check what else should be overwritten
context.getNode = () => {
return deepCopy(connectedNode);
};
context.getCredentials = async (key: string) => {
try {
return await getCredentials(
workflow,
connectedNode,
key,
additionalData,
mode,
executeData,
runExecutionData,
runIndex,
connectionInputData,
itemIndex,
);
} catch (error) {
// Display the error on the node which is causing it
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
throw error;
}
};
try {
const response = await nodeType.supplyData.call(context, itemIndex);
if (response.closeFunction) {
closeFunctions.push(response.closeFunction);
}
return response;
} catch (error) {
// Propagate errors from sub-nodes
if (error.functionality === 'configuration-node') throw error;
if (!(error instanceof ExecutionBaseError)) {
error = new NodeOperationError(connectedNode, error, {
itemIndex,
});
}
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
// Display the error on the node which is causing it
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
// Display on the calling node which node has the error
throw new NodeOperationError(
connectedNode,
`Error in sub-node ${connectedNode.name}`,
{
itemIndex,
functionality: 'configuration-node',
description: error.message,
},
);
}
});
// Validate the inputs
const nodes = await Promise.all(constParentNodes);
if (inputConfiguration.required && nodes.length === 0) {
throw new NodeOperationError(node, `A ${inputName} processor node must be connected!`);
}
if (
inputConfiguration.maxConnections !== undefined &&
nodes.length > inputConfiguration.maxConnections
) {
throw new NodeOperationError(
node,
`Only ${inputConfiguration.maxConnections} ${inputName} processor nodes are/is allowed to be connected!`,
);
}
return inputConfiguration.maxConnections === 1
? (nodes || [])[0]?.response
: nodes.map((node) => node.response);
return getInputConnectionData.call(
this,
workflow,
runExecutionData,
runIndex,
connectionInputData,
additionalData,
executeData,
mode,
closeFunctions,
inputName,
itemIndex,
inputIndex,
);
},
getNodeOutputs(): INodeOutputConfiguration[] {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => {
@@ -3862,12 +3891,14 @@ export function getExecuteHookFunctions(
/**
* Returns the execute functions regular nodes have access to when webhook-function is defined.
*/
// TODO: check where it is used and make sure close functions are called
export function getExecuteWebhookFunctions(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
webhookData: IWebhookData,
closeFunctions: CloseFunction[],
): IWebhookFunctions {
return ((workflow: Workflow, node: INode) => {
return {
@@ -3885,6 +3916,47 @@ export function getExecuteWebhookFunctions(
}
return additionalData.httpRequest.headers;
},
async getInputConnectionData(
inputName: ConnectionTypes,
itemIndex: number,
// TODO: Not implemented yet, and maybe also not needed
inputIndex?: number,
): Promise<unknown> {
// To be able to use expressions like "$json.sessionId" set the
// body data the webhook received to what is normally used for
// incoming node data.
const connectionInputData: INodeExecutionData[] = [
{ json: additionalData.httpRequest?.body || {} },
];
const runExecutionData: IRunExecutionData = {
resultData: {
runData: {},
},
};
const executeData: IExecuteData = {
data: {
main: [connectionInputData],
},
node,
source: null,
};
const runIndex = 0;
return getInputConnectionData.call(
this,
workflow,
runExecutionData,
runIndex,
connectionInputData,
additionalData,
executeData,
mode,
closeFunctions,
inputName,
itemIndex,
inputIndex,
);
},
getMode: () => mode,
getNodeParameter: (
parameterName: string,