Add Activation Trigger (#1570)

*  n8n start trigger node

* first declaration of WorkflowActivationMode

* implement first WorkflowActivationMode: 'init', 'create', 'update', 'activate'

* fix Server missing id

* add activation infos to triggers

* remove WorkflowActivationMode from webhook execution function

* add some missing activation and add manual activation

* clean up and fix some code

* fix UnhandledPromiseRejectionWarning: Error: Overwrite NodeExecuteFunctions.getExecuteTriggerFunctions.emit function!

* fix spaces

* use a better name for the node

* fix ident in package.json

* Contributions to lublak's PR #1287

* Fixed linting issues and change the way parameters are displayed

*  Fix name and minor improvements

Co-authored-by: lublak <lublak.de@gmail.com>
Co-authored-by: lublak <44057030+lublak@users.noreply.github.com>
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue
2021-03-23 19:08:47 +01:00
committed by GitHub
parent 11fb97223c
commit 726a99bf69
10 changed files with 134 additions and 41 deletions

View File

@@ -31,6 +31,7 @@ import {
NodeHelpers,
WebhookHttpMethod,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
@@ -66,7 +67,7 @@ export class ActiveWorkflowRunner {
for (const workflowData of workflowsData) {
console.log(` - ${workflowData.name}`);
try {
await this.add(workflowData.id.toString(), workflowData);
await this.add(workflowData.id.toString(), 'init', workflowData);
console.log(` => Started`);
} catch (error) {
console.log(` => ERROR: Workflow could not be activated:`);
@@ -273,7 +274,7 @@ export class ActiveWorkflowRunner {
* @returns {Promise<void>}
* @memberof ActiveWorkflowRunner
*/
async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): Promise<void> {
async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode, activation: WorkflowActivateMode): Promise<void> {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
let path = '' as string | undefined;
@@ -319,10 +320,10 @@ export class ActiveWorkflowRunner {
await Db.collections.Webhook?.insert(webhook);
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, activation, false);
if (webhookExists !== true) {
// If webhook does not exist yet create it
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, false);
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, activation, false);
}
} catch (error) {
@@ -378,7 +379,7 @@ export class ActiveWorkflowRunner {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
for (const webhookData of webhooks) {
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, 'update', false);
}
await WorkflowHelpers.saveStaticData(workflow);
@@ -446,9 +447,9 @@ export class ActiveWorkflowRunner {
* @returns {IGetExecutePollFunctions}
* @memberof ActiveWorkflowRunner
*/
getExecutePollFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): IGetExecutePollFunctions {
getExecutePollFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode, activation: WorkflowActivateMode): IGetExecutePollFunctions {
return ((workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(workflow, node, additionalData, mode);
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(workflow, node, additionalData, mode, activation);
returnFunctions.__emit = (data: INodeExecutionData[][]): void => {
this.runWorkflow(workflowData, node, data, additionalData, mode);
};
@@ -467,9 +468,9 @@ export class ActiveWorkflowRunner {
* @returns {IGetExecuteTriggerFunctions}
* @memberof ActiveWorkflowRunner
*/
getExecuteTriggerFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): IGetExecuteTriggerFunctions{
getExecuteTriggerFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode, activation: WorkflowActivateMode): IGetExecuteTriggerFunctions{
return ((workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(workflow, node, additionalData, mode);
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(workflow, node, additionalData, mode, activation);
returnFunctions.emit = (data: INodeExecutionData[][]): void => {
WorkflowHelpers.saveStaticData(workflow);
this.runWorkflow(workflowData, node, data, additionalData, mode).catch((err) => console.error(err));
@@ -486,7 +487,7 @@ export class ActiveWorkflowRunner {
* @returns {Promise<void>}
* @memberof ActiveWorkflowRunner
*/
async add(workflowId: string, workflowData?: IWorkflowDb): Promise<void> {
async add(workflowId: string, activation: WorkflowActivateMode, workflowData?: IWorkflowDb): Promise<void> {
if (this.activeWorkflows === null) {
throw new Error(`The "activeWorkflows" instance did not get initialized yet.`);
}
@@ -511,15 +512,15 @@ export class ActiveWorkflowRunner {
const mode = 'trigger';
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode);
const getPollFunctions = this.getExecutePollFunctions(workflowData, additionalData, mode);
const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode, activation);
const getPollFunctions = this.getExecutePollFunctions(workflowData, additionalData, mode, activation);
// Add the workflows which have webhooks defined
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode);
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode, activation);
if (workflowInstance.getTriggerNodes().length !== 0
|| workflowInstance.getPollNodes().length !== 0) {
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, mode, activation, getTriggerFunctions, getPollFunctions);
}
if (this.activationErrors[workflowId] !== undefined) {

View File

@@ -624,7 +624,7 @@ class App {
try {
await this.externalHooks.run('workflow.activate', [responseData]);
await this.activeWorkflowRunner.add(id);
await this.activeWorkflowRunner.add(id, isActive ? 'update' : 'activate');
} catch (error) {
// If workflow could not be activated set it again to inactive
newWorkflowData.active = false;
@@ -670,6 +670,7 @@ class App {
const startNodes: string[] | undefined = req.body.startNodes;
const destinationNode: string | undefined = req.body.destinationNode;
const executionMode = 'manual';
const activationMode = 'manual';
const sessionId = GenericHelpers.getSessionId(req);
@@ -679,7 +680,7 @@ class App {
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const nodeTypes = NodeTypes();
const workflowInstance = new Workflow({ id: workflowData.id, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: false, nodeTypes, staticData: undefined, settings: workflowData.settings });
const needsWebhook = await this.testWebhooks.needsWebhookData(workflowData, workflowInstance, additionalData, executionMode, sessionId, destinationNode);
const needsWebhook = await this.testWebhooks.needsWebhookData(workflowData, workflowInstance, additionalData, executionMode, activationMode, sessionId, destinationNode);
if (needsWebhook === true) {
return {
waitingForWebhook: true,

View File

@@ -17,6 +17,7 @@ import {
IWorkflowExecuteAdditionalData,
WebhookHttpMethod,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
@@ -161,7 +162,7 @@ export class TestWebhooks {
* @returns {(Promise<IExecutionDb | undefined>)}
* @memberof TestWebhooks
*/
async needsWebhookData(workflowData: IWorkflowDb, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, sessionId?: string, destinationNode?: string): Promise<boolean> {
async needsWebhookData(workflowData: IWorkflowDb, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, sessionId?: string, destinationNode?: string): Promise<boolean> {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, destinationNode);
if (webhooks.length === 0) {
@@ -193,7 +194,7 @@ export class TestWebhooks {
};
try {
await this.activeWebhooks!.add(workflow, webhookData, mode);
await this.activeWebhooks!.add(workflow, webhookData, mode, activation);
} catch (error) {
activatedKey.forEach(deleteKey => delete this.testWebhookData[deleteKey] );
await this.activeWebhooks!.removeWorkflow(workflow);