feat: Human in the loop (#10675)

Co-authored-by: Giulio Andreini <g.andreini@gmail.com>
This commit is contained in:
Michael Kret
2024-10-07 16:45:22 +03:00
committed by GitHub
parent d2713ae50a
commit 41228b472d
24 changed files with 1298 additions and 196 deletions

View File

@@ -1,5 +1,11 @@
import type express from 'express';
import { NodeHelpers, Workflow } from 'n8n-workflow';
import {
type INodes,
type IWorkflowBase,
NodeHelpers,
SEND_AND_WAIT_OPERATION,
Workflow,
} from 'n8n-workflow';
import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@@ -42,6 +48,29 @@ export class WaitingWebhooks implements IWebhookManager {
execution.data.executionData!.nodeExecutionStack[0].node.disabled = true;
}
private isSendAndWaitRequest(nodes: INodes, suffix: string | undefined) {
return (
suffix &&
Object.keys(nodes).some(
(node) =>
nodes[node].id === suffix && nodes[node].parameters.operation === SEND_AND_WAIT_OPERATION,
)
);
}
private getWorkflow(workflowData: IWorkflowBase) {
return new Workflow({
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
}
async executeWebhook(
req: WaitingWebhookRequest,
res: express.Response,
@@ -66,10 +95,21 @@ export class WaitingWebhooks implements IWebhookManager {
throw new ConflictError(`The execution "${executionId} is running already.`);
}
if (execution.finished || execution.data.resultData.error) {
if (execution.data?.resultData?.error) {
throw new ConflictError(`The execution "${executionId} has finished already.`);
}
if (execution.finished) {
const { workflowData } = execution;
const { nodes } = this.getWorkflow(workflowData);
if (this.isSendAndWaitRequest(nodes, suffix)) {
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
return { noWebhookResponse: true };
} else {
throw new ConflictError(`The execution "${executionId} has finished already.`);
}
}
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
// Set the node as disabled so that the data does not get executed again as it would result
@@ -83,17 +123,7 @@ export class WaitingWebhooks implements IWebhookManager {
execution.data.resultData.runData[lastNodeExecuted].pop();
const { workflowData } = execution;
const workflow = new Workflow({
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
const workflow = this.getWorkflow(workflowData);
const workflowStartNode = workflow.getNode(lastNodeExecuted);
if (workflowStartNode === null) {
@@ -116,8 +146,13 @@ export class WaitingWebhooks implements IWebhookManager {
if (webhookData === undefined) {
// If no data got found it means that the execution can not be started via a webhook.
// Return 404 because we do not want to give any data if the execution exists or not.
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
throw new NotFoundError(errorMessage);
if (this.isSendAndWaitRequest(workflow.nodes, suffix)) {
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
return { noWebhookResponse: true };
} else {
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
throw new NotFoundError(errorMessage);
}
}
const runExecutionData = execution.data;