mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 10:31:15 +00:00
Done
This commit is contained in:
@@ -11,11 +11,11 @@ import {
|
||||
WorkflowHelpers,
|
||||
WorkflowRunner,
|
||||
WorkflowExecuteAdditionalData,
|
||||
IWebhookDb,
|
||||
} from './';
|
||||
|
||||
import {
|
||||
ActiveWorkflows,
|
||||
ActiveWebhooks,
|
||||
NodeExecuteFunctions,
|
||||
} from 'n8n-core';
|
||||
|
||||
@@ -26,7 +26,7 @@ import {
|
||||
INode,
|
||||
INodeExecutionData,
|
||||
IRunExecutionData,
|
||||
IWebhookData,
|
||||
NodeHelpers,
|
||||
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
|
||||
WebhookHttpMethod,
|
||||
Workflow,
|
||||
@@ -38,19 +38,21 @@ import * as express from 'express';
|
||||
|
||||
export class ActiveWorkflowRunner {
|
||||
private activeWorkflows: ActiveWorkflows | null = null;
|
||||
private activeWebhooks: ActiveWebhooks | null = null;
|
||||
|
||||
private activationErrors: {
|
||||
[key: string]: IActivationError;
|
||||
} = {};
|
||||
|
||||
|
||||
async init() {
|
||||
|
||||
// Get the active workflows from database
|
||||
|
||||
// NOTE
|
||||
// Here I guess we can have a flag on the workflow table like hasTrigger
|
||||
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
|
||||
const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[];
|
||||
|
||||
this.activeWebhooks = new ActiveWebhooks();
|
||||
|
||||
// Add them as active workflows
|
||||
this.activeWorkflows = new ActiveWorkflows();
|
||||
|
||||
if (workflowsData.length !== 0) {
|
||||
@@ -58,20 +60,27 @@ export class ActiveWorkflowRunner {
|
||||
console.log(' Start Active Workflows:');
|
||||
console.log(' ================================');
|
||||
|
||||
const nodeTypes = NodeTypes();
|
||||
|
||||
for (const workflowData of workflowsData) {
|
||||
console.log(` - ${workflowData.name}`);
|
||||
try {
|
||||
await this.add(workflowData.id.toString(), workflowData);
|
||||
console.log(` => Started`);
|
||||
} catch (error) {
|
||||
console.log(` => ERROR: Workflow could not be activated:`);
|
||||
console.log(` ${error.message}`);
|
||||
|
||||
const workflow = new Workflow({ id: workflowData.id.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
|
||||
|
||||
if (workflow.getTriggerNodes().length !== 0
|
||||
|| workflow.getPollNodes().length !== 0) {
|
||||
console.log(` - ${workflowData.name}`);
|
||||
try {
|
||||
await this.add(workflowData.id.toString(), workflowData);
|
||||
console.log(` => Started`);
|
||||
} catch (error) {
|
||||
console.log(` => ERROR: Workflow could not be activated:`);
|
||||
console.log(` ${error.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Removes all the currently active workflows
|
||||
*
|
||||
@@ -94,7 +103,6 @@ export class ActiveWorkflowRunner {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks if a webhook for the given method and path exists and executes the workflow.
|
||||
*
|
||||
@@ -110,30 +118,41 @@ export class ActiveWorkflowRunner {
|
||||
throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404);
|
||||
}
|
||||
|
||||
const webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path);
|
||||
const webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb;
|
||||
|
||||
if (webhookData === undefined) {
|
||||
// check if something exist
|
||||
if (webhook === undefined) {
|
||||
// The requested webhook is not registered
|
||||
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
|
||||
}
|
||||
|
||||
const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflowId);
|
||||
const workflowData = await Db.collections.Workflow!.findOne(webhook.workflowId);
|
||||
if (workflowData === undefined) {
|
||||
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflowId}"`, 404, 404);
|
||||
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhook.workflowId}"`, 404, 404);
|
||||
}
|
||||
|
||||
const nodeTypes = NodeTypes();
|
||||
const workflow = new Workflow({ id: webhookData.workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
|
||||
const workflow = new Workflow({ id: webhook.workflowId.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
|
||||
|
||||
const credentials = await WorkflowCredentials([workflow.getNode(webhook.node as string) as INode]);
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
|
||||
|
||||
const webhookData = NodeHelpers.getNodeWebhooks(workflow, workflow.getNode(webhook.node as string) as INode, additionalData).filter((webhook) => {
|
||||
return (webhook.httpMethod === httpMethod && webhook.path === path);
|
||||
})[0];
|
||||
|
||||
// Get the node which has the webhook defined to know where to start from and to
|
||||
// get additional data
|
||||
const workflowStartNode = workflow.getNode(webhookData.node);
|
||||
|
||||
if (workflowStartNode === null) {
|
||||
throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404);
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
//@ts-ignore
|
||||
WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
@@ -143,19 +162,14 @@ export class ActiveWorkflowRunner {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the ids of the currently active workflows
|
||||
*
|
||||
* @returns {string[]}
|
||||
* @memberof ActiveWorkflowRunner
|
||||
*/
|
||||
getActiveWorkflows(): string[] {
|
||||
if (this.activeWorkflows === null) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return this.activeWorkflows.allActiveWorkflows();
|
||||
getActiveWorkflows(): Promise<IWorkflowDb[]> {
|
||||
return Db.collections.Workflow?.find({ select: ['id'] }) as Promise<IWorkflowDb[]>;
|
||||
}
|
||||
|
||||
|
||||
@@ -166,15 +180,11 @@ export class ActiveWorkflowRunner {
|
||||
* @returns {boolean}
|
||||
* @memberof ActiveWorkflowRunner
|
||||
*/
|
||||
isActive(id: string): boolean {
|
||||
if (this.activeWorkflows !== null) {
|
||||
return this.activeWorkflows.isActive(id);
|
||||
}
|
||||
|
||||
return false;
|
||||
async isActive(id: string): Promise<boolean> {
|
||||
const workflow = await Db.collections.Workflow?.findOne({ id }) as IWorkflowDb;
|
||||
return workflow?.active as boolean;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return error if there was a problem activating the workflow
|
||||
*
|
||||
@@ -190,7 +200,6 @@ export class ActiveWorkflowRunner {
|
||||
return this.activationErrors[id];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds all the webhooks of the workflow
|
||||
*
|
||||
@@ -202,12 +211,47 @@ export class ActiveWorkflowRunner {
|
||||
*/
|
||||
async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): Promise<void> {
|
||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
|
||||
let path = '';
|
||||
|
||||
for (const webhookData of webhooks) {
|
||||
await this.activeWebhooks!.add(workflow, webhookData, mode);
|
||||
// Save static data!
|
||||
await WorkflowHelpers.saveStaticData(workflow);
|
||||
|
||||
const node = workflow.getNode(webhookData.node) as INode;
|
||||
node.name = webhookData.node;
|
||||
|
||||
path = node.parameters.path as string;
|
||||
|
||||
if (node.parameters.path === undefined) {
|
||||
path = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['path'], 'GET') as string;
|
||||
}
|
||||
|
||||
const webhook = {
|
||||
workflowId: webhookData.workflowId,
|
||||
webhookPath: NodeHelpers.getNodeWebhookPath(workflow.id as string, node, path),
|
||||
node: node.name,
|
||||
method: webhookData.httpMethod,
|
||||
} as IWebhookDb;
|
||||
|
||||
try {
|
||||
await Db.collections.Webhook?.insert(webhook);
|
||||
|
||||
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
|
||||
if (webhookExists === false) {
|
||||
// If webhook does not exist yet create it
|
||||
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, false);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
// The workflow was saved with two webhooks with the
|
||||
// same path/method so delete all webhooks saved
|
||||
|
||||
await Db.collections.Webhook?.delete({ workflowId: workflow.id });
|
||||
|
||||
// then show error to the user
|
||||
throw new Error(error.message || error.detail);
|
||||
}
|
||||
}
|
||||
// Save static data!
|
||||
await WorkflowHelpers.saveStaticData(workflow);
|
||||
}
|
||||
|
||||
|
||||
@@ -227,10 +271,22 @@ export class ActiveWorkflowRunner {
|
||||
const nodeTypes = NodeTypes();
|
||||
const workflow = new Workflow({ id: workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings });
|
||||
|
||||
await this.activeWebhooks!.removeWorkflow(workflow);
|
||||
const mode = 'internal';
|
||||
|
||||
// Save the static workflow data if needed
|
||||
await WorkflowHelpers.saveStaticData(workflow);
|
||||
const credentials = await WorkflowCredentials(workflowData.nodes);
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
|
||||
|
||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
|
||||
|
||||
for (const webhookData of webhooks) {
|
||||
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
|
||||
}
|
||||
|
||||
const webhook = {
|
||||
workflowId: workflowData.id,
|
||||
} as IWebhookDb;
|
||||
|
||||
await Db.collections.Webhook?.delete(webhook);
|
||||
}
|
||||
|
||||
|
||||
@@ -322,7 +378,6 @@ export class ActiveWorkflowRunner {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Makes a workflow active
|
||||
*
|
||||
@@ -361,7 +416,11 @@ export class ActiveWorkflowRunner {
|
||||
|
||||
// Add the workflows which have webhooks defined
|
||||
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode);
|
||||
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
|
||||
|
||||
if (workflowInstance.getTriggerNodes().length !== 0
|
||||
|| workflowInstance.getPollNodes().length !== 0) {
|
||||
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
|
||||
}
|
||||
|
||||
if (this.activationErrors[workflowId] !== undefined) {
|
||||
// If there were activation errors delete them
|
||||
@@ -386,7 +445,6 @@ export class ActiveWorkflowRunner {
|
||||
await WorkflowHelpers.saveStaticData(workflowInstance!);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Makes a workflow inactive
|
||||
*
|
||||
@@ -395,6 +453,7 @@ export class ActiveWorkflowRunner {
|
||||
* @memberof ActiveWorkflowRunner
|
||||
*/
|
||||
async remove(workflowId: string): Promise<void> {
|
||||
|
||||
if (this.activeWorkflows !== null) {
|
||||
// Remove all the webhooks of the workflow
|
||||
await this.removeWorkflowWebhooks(workflowId);
|
||||
@@ -404,8 +463,13 @@ export class ActiveWorkflowRunner {
|
||||
delete this.activationErrors[workflowId];
|
||||
}
|
||||
|
||||
// Remove the workflow from the "list" of active workflows
|
||||
return this.activeWorkflows.remove(workflowId);
|
||||
// if it's active in memory then it's a trigger
|
||||
// so remove from list of actives workflows
|
||||
if (this.activeWorkflows.isActive(workflowId)) {
|
||||
this.activeWorkflows.remove(workflowId);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
throw new Error(`The "activeWorkflows" instance did not get initialized yet.`);
|
||||
|
||||
Reference in New Issue
Block a user