From c857e42677ef0d415caf66f00d7af029546dfd79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 7 Nov 2023 13:48:48 +0100 Subject: [PATCH] feat(core): Coordinate workflow activation in multiple main scenario in internal API (#7566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Story: https://linear.app/n8n/issue/PAY-926 This PR coordinates workflow activation on instance startup and on leadership change in multiple main scenario in the internal API. Part 3 on manual workflow activation and deactivation will be a separate PR. ### Part 1: Instance startup In multi-main scenario, on starting an instance... - [x] If the instance is the leader, it should add webhooks, triggers and pollers. - [x] If the instance is the follower, it should not add webhooks, triggers or pollers. - [x] Unit tests. ### Part 2: Leadership change In multi-main scenario, if the main instance leader dies… - [x] The new main instance leader must activate all trigger- and poller-based workflows, excluding webhook-based workflows. - [x] The old main instance leader must deactivate all trigger- and poller-based workflows, excluding webhook-based workflows. - [x] Unit tests. To test, start two instances and check behavior on startup and leadership change: ``` EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug npm run start EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug N8N_PORT=5679 npm run start ``` --- packages/cli/src/ActiveWorkflowRunner.ts | 538 +++++++++++------- packages/cli/src/Interfaces.ts | 15 - packages/cli/src/Server.ts | 2 +- packages/cli/src/commands/start.ts | 11 + .../src/databases/entities/WorkflowEntity.ts | 4 + .../repositories/workflow.repository.ts | 14 + .../services/orchestration.base.service.ts | 4 +- .../main/MultiMainInstance.publisher.ee.ts | 6 +- .../integration/ActiveWorkflowRunner.test.ts | 356 ++++++++++++ .../cli/test/integration/shared/testDb.ts | 4 +- .../test/integration/shared/utils/index.ts | 16 +- .../test/unit/ActiveWorkflowRunner.test.ts | 278 --------- packages/core/src/ActiveWorkflows.ts | 189 +++--- packages/workflow/src/Interfaces.ts | 9 +- .../workflow/src/WorkflowActivationError.ts | 11 +- 15 files changed, 839 insertions(+), 618 deletions(-) create mode 100644 packages/cli/test/integration/ActiveWorkflowRunner.test.ts delete mode 100644 packages/cli/test/unit/ActiveWorkflowRunner.test.ts diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index a91e510065..e59c28ab59 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { Service } from 'typedi'; +import Container, { Service } from 'typedi'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import type { @@ -35,8 +35,6 @@ import type express from 'express'; import * as Db from '@/Db'; import type { - IActivationError, - IQueuedWorkflowActivations, IResponseCallbackData, IWebhookManager, IWorkflowDb, @@ -65,99 +63,76 @@ import { webhookNotFoundErrorMessage } from './utils'; import { In } from 'typeorm'; import { WebhookService } from './services/webhook.service'; import { Logger } from './Logger'; +import { WorkflowRepository } from '@/databases/repositories'; +import config from '@/config'; +import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee'; const WEBHOOK_PROD_UNREGISTERED_HINT = "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; @Service() export class ActiveWorkflowRunner implements IWebhookManager { - private activeWorkflows = new ActiveWorkflows(); + activeWorkflows = new ActiveWorkflows(); private activationErrors: { - [key: string]: IActivationError; + [workflowId: string]: { + time: number; // ms + error: { + message: string; + }; + }; } = {}; - private queuedWorkflowActivations: { - [key: string]: IQueuedWorkflowActivations; + private queuedActivations: { + [workflowId: string]: { + activationMode: WorkflowActivateMode; + lastTimeout: number; + timeout: NodeJS.Timeout; + workflowData: IWorkflowDb; + }; } = {}; + isMultiMainScenario = + config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled'); + + multiMainInstancePublisher: MultiMainInstancePublisher | undefined; + constructor( private readonly logger: Logger, private readonly activeExecutions: ActiveExecutions, private readonly externalHooks: ExternalHooks, private readonly nodeTypes: NodeTypes, private readonly webhookService: WebhookService, + private readonly workflowRepository: WorkflowRepository, ) {} async init() { - // Get the active workflows from database + if (this.isMultiMainScenario) { + const { MultiMainInstancePublisher } = await import( + '@/services/orchestration/main/MultiMainInstance.publisher.ee' + ); - // NOTE - // Here I guess we can have a flag on the workflow table like hasTrigger - // so instead of pulling all the active webhooks just pull the actives that have a trigger - const workflowsData: IWorkflowDb[] = (await Db.collections.Workflow.find({ - where: { active: true }, - relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'], - })) as IWorkflowDb[]; + this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher); - if (workflowsData.length !== 0) { - this.logger.info(' ================================'); - this.logger.info(' Start Active Workflows:'); - this.logger.info(' ================================'); - - for (const workflowData of workflowsData) { - this.logger.info(` - ${workflowData.name} (ID: ${workflowData.id})`); - this.logger.debug(`Initializing active workflow "${workflowData.name}" (startup)`, { - workflowName: workflowData.name, - workflowId: workflowData.id, - }); - try { - await this.add(workflowData.id, 'init', workflowData); - this.logger.verbose(`Successfully started workflow "${workflowData.name}"`, { - workflowName: workflowData.name, - workflowId: workflowData.id, - }); - this.logger.info(' => Started'); - } catch (error) { - ErrorReporter.error(error); - this.logger.info( - ' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue', - ); - - this.logger.info(` ${error.message}`); - this.logger.error( - `Issue on initial workflow activation try "${workflowData.name}" (startup)`, - { - workflowName: workflowData.name, - workflowId: workflowData.id, - }, - ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - this.executeErrorWorkflow(error, workflowData, 'internal'); - - if (!error.message.includes('Authorization')) { - // Keep on trying to activate the workflow if not an auth issue - this.addQueuedWorkflowActivation('init', workflowData); - } - } - } - this.logger.verbose('Finished initializing active workflows (startup)'); + await this.multiMainInstancePublisher.init(); } + await this.addActiveWorkflows('init'); + await this.externalHooks.run('activeWorkflows.initialized', []); await this.webhookService.populateCache(); } /** - * Removes all the currently active workflows + * Removes all the currently active workflows from memory. */ - async removeAll(): Promise { + async removeAll() { let activeWorkflowIds: string[] = []; this.logger.verbose('Call to remove all active workflows received (removeAll)'); activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows()); - const activeWorkflows = await this.getActiveWorkflows(); + const activeWorkflows = await this.allActiveInStorage(); activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows]; // Make sure IDs are unique activeWorkflowIds = Array.from(new Set(activeWorkflowIds)); @@ -284,76 +259,86 @@ export class ActiveWorkflowRunner implements IWebhookManager { } /** - * Returns the ids of the currently active workflows + * Returns the ids of the currently active workflows from memory. */ - async getActiveWorkflows(user?: User): Promise { - let activeWorkflows: WorkflowEntity[] = []; - if (!user || user.globalRole.name === 'owner') { - activeWorkflows = await Db.collections.Workflow.find({ - select: ['id'], - where: { active: true }, - }); - return activeWorkflows - .map((workflow) => workflow.id) - .filter((workflowId) => !this.activationErrors[workflowId]); - } else { - const active = await Db.collections.Workflow.find({ - select: ['id'], - where: { active: true }, - }); - const activeIds = active.map((workflow) => workflow.id); - const where = whereClause({ - user, - entityType: 'workflow', - }); - Object.assign(where, { workflowId: In(activeIds) }); - const shared = await Db.collections.SharedWorkflow.find({ - select: ['workflowId'], - where, - }); - return shared - .map((id) => id.workflowId) - .filter((workflowId) => !this.activationErrors[workflowId]); - } + allActiveInMemory() { + return this.activeWorkflows.allActiveWorkflows(); } /** - * Returns if the workflow is active - * - * @param {string} id The id of the workflow to check + * Get the IDs of active workflows from storage. */ - async isActive(id: string): Promise { - const workflow = await Db.collections.Workflow.findOne({ - select: ['active'], - where: { id }, + async allActiveInStorage(user?: User) { + const isFullAccess = !user || user.globalRole.name === 'owner'; + + if (isFullAccess) { + const activeWorkflows = await this.workflowRepository.find({ + select: ['id'], + where: { active: true }, + }); + + return activeWorkflows + .map((workflow) => workflow.id) + .filter((workflowId) => !this.activationErrors[workflowId]); + } + + const where = whereClause({ + user, + entityType: 'workflow', }); + + const activeWorkflows = await this.workflowRepository.find({ + select: ['id'], + where: { active: true }, + }); + + const activeIds = activeWorkflows.map((workflow) => workflow.id); + + Object.assign(where, { workflowId: In(activeIds) }); + + const sharings = await Db.collections.SharedWorkflow.find({ + select: ['workflowId'], + where, + }); + + return sharings + .map((sharing) => sharing.workflowId) + .filter((workflowId) => !this.activationErrors[workflowId]); + } + + /** + * Returns if the workflow is stored as `active`. + * + * @important Do not confuse with `ActiveWorkflows.isActive()`, + * which checks if the workflow is active in memory. + */ + async isActive(workflowId: string) { + const workflow = await this.workflowRepository.findOne({ + select: ['active'], + where: { id: workflowId }, + }); + return !!workflow?.active; } /** * Return error if there was a problem activating the workflow - * - * @param {string} id The id of the workflow to return the error of */ - getActivationError(id: string): IActivationError | undefined { - if (this.activationErrors[id] === undefined) { - return undefined; - } - - return this.activationErrors[id]; + getActivationError(workflowId: string) { + return this.activationErrors[workflowId]; } /** - * Adds all the webhooks of the workflow + * Register workflow-defined webhooks in the `workflow_entity` table. */ - async addWorkflowWebhooks( + async addWebhooks( workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, - ): Promise { + ) { const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); - let path = '' as string | undefined; + let path = ''; for (const webhookData of webhooks) { const node = workflow.getNode(webhookData.node) as INode; @@ -401,7 +386,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { } try { - await this.removeWorkflowWebhooks(workflow.id); + await this.clearWebhooks(workflow.id); } catch (error1) { ErrorReporter.error(error1); this.logger.error( @@ -428,14 +413,14 @@ export class ActiveWorkflowRunner implements IWebhookManager { } /** - * Remove all the webhooks of the workflow - * + * Clear workflow-defined webhooks from the `webhook_entity` table. */ - async removeWorkflowWebhooks(workflowId: string): Promise { + async clearWebhooks(workflowId: string) { const workflowData = await Db.collections.Workflow.findOne({ where: { id: workflowId }, relations: ['shared', 'shared.user', 'shared.user.globalRole'], }); + if (workflowData === null) { throw new Error(`Could not find workflow with id "${workflowId}"`); } @@ -468,11 +453,6 @@ export class ActiveWorkflowRunner implements IWebhookManager { await this.webhookService.deleteWorkflowWebhooks(workflowId); } - /** - * Runs the given workflow - * - */ - async runWorkflow( workflowData: IWorkflowDb, node: INode, @@ -520,7 +500,6 @@ export class ActiveWorkflowRunner implements IWebhookManager { /** * Return poll function which gets the global functions from n8n-core * and overwrites the emit to be able to start it in subprocess - * */ getExecutePollFunctions( workflowData: IWorkflowDb, @@ -576,7 +555,6 @@ export class ActiveWorkflowRunner implements IWebhookManager { /** * Return trigger function which gets the global functions from n8n-core * and overwrites the emit to be able to start it in subprocess - * */ getExecuteTriggerFunctions( workflowData: IWorkflowDb, @@ -647,7 +625,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { ); this.executeErrorWorkflow(activationError, workflowData, mode); - this.addQueuedWorkflowActivation(activation, workflowData); + this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity); }; return returnFunctions; }; @@ -676,113 +654,175 @@ export class ActiveWorkflowRunner implements IWebhookManager { } /** - * Makes a workflow active + * Register as active in memory all workflows stored as `active`. + */ + async addActiveWorkflows(activationMode: WorkflowActivateMode) { + const dbWorkflows = await this.workflowRepository.getAllActive(); + + if (dbWorkflows.length === 0) return; + + this.logger.info(' ================================'); + this.logger.info(' Start Active Workflows:'); + this.logger.info(' ================================'); + + for (const dbWorkflow of dbWorkflows) { + this.logger.info(` - ${dbWorkflow.display()}`); + this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }); + + try { + await this.add(dbWorkflow.id, activationMode, dbWorkflow); + + this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }); + this.logger.info(' => Started'); + } catch (error) { + ErrorReporter.error(error); + this.logger.info( + ' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue', + ); + + this.logger.info(` ${error.message}`); + this.logger.error( + `Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`, + { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }, + ); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + this.executeErrorWorkflow(error, dbWorkflow, 'internal'); + + // do not keep trying to activate on authorization error + if (error.message.includes('Authorization')) continue; + + this.addQueuedWorkflowActivation('init', dbWorkflow); + } + } + + this.logger.verbose('Finished activating workflows (startup)'); + } + + async addAllTriggerAndPollerBasedWorkflows() { + this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...'); + + await this.addActiveWorkflows('leadershipChange'); + } + + async removeAllTriggerAndPollerBasedWorkflows() { + this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...'); + + await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); + } + + /** + * Register a workflow as active. * - * @param {string} workflowId The id of the workflow to activate - * @param {IWorkflowDb} [workflowData] If workflowData is given it saves the DB query + * An activatable workflow may be webhook-, trigger-, or poller-based: + * + * - A `webhook` is an HTTP-based node that can start a workflow when called + * by a third-party service. + * - A `poller` is an HTTP-based node that can start a workflow when detecting + * a change while regularly checking a third-party service. + * - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a + * time-based node like Schedule Trigger or a message-queue-based node. + * + * Note that despite the name, most "trigger" nodes are actually webhook-based + * and so qualify as `webhook`, e.g. Stripe Trigger. + * + * Triggers and pollers are registered as active in memory at `ActiveWorkflows`, + * but webhooks are registered by being entered in the `webhook_entity` table, + * since webhooks do not require continuous execution. */ async add( workflowId: string, - activation: WorkflowActivateMode, - workflowData?: IWorkflowDb, - ): Promise { - let workflowInstance: Workflow; + activationMode: WorkflowActivateMode, + existingWorkflow?: WorkflowEntity, + ) { + let workflow: Workflow; + + let shouldAddWebhooks = true; + let shouldAddTriggersAndPollers = true; + + if (this.isMultiMainScenario && activationMode !== 'leadershipChange') { + shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false; + shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false; + } + + if (this.isMultiMainScenario && activationMode === 'leadershipChange') { + shouldAddWebhooks = false; + shouldAddTriggersAndPollers = true; + } + try { - if (workflowData === undefined) { - workflowData = (await Db.collections.Workflow.findOne({ - where: { id: workflowId }, - relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'], - })) as IWorkflowDb; + const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId)); + + if (!dbWorkflow) { + throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`); } - if (!workflowData) { - throw new Error(`Could not find workflow with id "${workflowId}".`); - } - workflowInstance = new Workflow({ - id: workflowId, - name: workflowData.name, - nodes: workflowData.nodes, - connections: workflowData.connections, - active: workflowData.active, + workflow = new Workflow({ + id: dbWorkflow.id, + name: dbWorkflow.name, + nodes: dbWorkflow.nodes, + connections: dbWorkflow.connections, + active: dbWorkflow.active, nodeTypes: this.nodeTypes, - staticData: workflowData.staticData, - settings: workflowData.settings, + staticData: dbWorkflow.staticData, + settings: dbWorkflow.settings, }); - const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(STARTING_NODES); + const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES); + if (!canBeActivated) { - this.logger.error(`Unable to activate workflow "${workflowData.name}"`); - throw new Error( - 'The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated.', + throw new WorkflowActivationError( + `Workflow ${dbWorkflow.display()} has no node to start the workflow - at least one trigger, poller or webhook node is required`, ); } - const mode = 'trigger'; - const workflowOwner = (workflowData as WorkflowEntity).shared.find( - (shared) => shared.role.name === 'owner', - ); - if (!workflowOwner) { - throw new Error('Workflow cannot be activated because it has no owner'); + const sharing = dbWorkflow.shared.find((shared) => shared.role.name === 'owner'); + + if (!sharing) { + throw new WorkflowActivationError(`Workflow ${dbWorkflow.display()} has no owner`); } - const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowOwner.user.id); - const getTriggerFunctions = this.getExecuteTriggerFunctions( - workflowData, - additionalData, - mode, - activation, - ); - const getPollFunctions = this.getExecutePollFunctions( - workflowData, - additionalData, - mode, - activation, - ); - // Add the workflows which have webhooks defined - await this.addWorkflowWebhooks(workflowInstance, additionalData, mode, activation); + const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id); - if ( - workflowInstance.getTriggerNodes().length !== 0 || - workflowInstance.getPollNodes().length !== 0 - ) { - await this.activeWorkflows.add( - workflowId, - workflowInstance, + if (shouldAddWebhooks) { + this.logger.debug('============'); + this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`); + this.logger.debug('============'); + + await this.addWebhooks(workflow, additionalData, 'trigger', activationMode); + } + + if (shouldAddTriggersAndPollers) { + this.logger.debug('============'); + this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`); + this.logger.debug('============'); + + await this.addTriggersAndPollers(dbWorkflow, workflow, { + activationMode, + executionMode: 'trigger', additionalData, - mode, - activation, - getTriggerFunctions, - getPollFunctions, - ); - this.logger.verbose(`Successfully activated workflow "${workflowData.name}"`, { - workflowId, - workflowName: workflowData.name, }); } // Workflow got now successfully activated so make sure nothing is left in the queue this.removeQueuedWorkflowActivation(workflowId); - if (this.activationErrors[workflowId] !== undefined) { - // If there were activation errors delete them + if (this.activationErrors[workflowId]) { delete this.activationErrors[workflowId]; } - if (workflowInstance.id) { - // Sum all triggers in the workflow, EXCLUDING the manual trigger - const triggerFilter = (nodeType: INodeType) => - !!nodeType.trigger && !nodeType.description.name.includes('manualTrigger'); - const triggerCount = - workflowInstance.queryNodes(triggerFilter).length + - workflowInstance.getPollNodes().length + - WebhookHelpers.getWorkflowWebhooks(workflowInstance, additionalData, undefined, true) - .length; - await WorkflowsService.updateWorkflowTriggerCount(workflowInstance.id, triggerCount); - } + const triggerCount = this.countTriggers(workflow, additionalData); + await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount); } catch (error) { - // There was a problem activating the workflow - - // Save the error this.activationErrors[workflowId] = { time: new Date().getTime(), error: { @@ -795,7 +835,24 @@ export class ActiveWorkflowRunner implements IWebhookManager { // If for example webhooks get created it sometimes has to save the // id of them in the static data. So make sure that data gets persisted. - await WorkflowsService.saveStaticData(workflowInstance!); + await WorkflowsService.saveStaticData(workflow); + } + + /** + * Count all triggers in the workflow, excluding Manual Trigger. + */ + private countTriggers( + workflow: Workflow, + additionalData: IWorkflowExecuteAdditionalDataWorkflow, + ) { + const triggerFilter = (nodeType: INodeType) => + !!nodeType.trigger && !nodeType.description.name.includes('manualTrigger'); + + return ( + workflow.queryNodes(triggerFilter).length + + workflow.getPollNodes().length + + WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true).length + ); } /** @@ -803,10 +860,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { * Meaning it will keep on trying to activate it in regular * amounts indefinitely. */ - addQueuedWorkflowActivation( - activationMode: WorkflowActivateMode, - workflowData: IWorkflowDb, - ): void { + addQueuedWorkflowActivation(activationMode: WorkflowActivateMode, workflowData: WorkflowEntity) { const workflowId = workflowData.id; const workflowName = workflowData.name; @@ -819,7 +873,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { await this.add(workflowId, activationMode, workflowData); } catch (error) { ErrorReporter.error(error); - let lastTimeout = this.queuedWorkflowActivations[workflowId].lastTimeout; + let lastTimeout = this.queuedActivations[workflowId].lastTimeout; if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) { lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT); } @@ -834,8 +888,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { }, ); - this.queuedWorkflowActivations[workflowId].lastTimeout = lastTimeout; - this.queuedWorkflowActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout); + this.queuedActivations[workflowId].lastTimeout = lastTimeout; + this.queuedActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout); return; } this.logger.info( @@ -851,7 +905,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { // multiple run in parallel this.removeQueuedWorkflowActivation(workflowId); - this.queuedWorkflowActivations[workflowId] = { + this.queuedActivations[workflowId] = { activationMode, lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT), @@ -862,18 +916,18 @@ export class ActiveWorkflowRunner implements IWebhookManager { /** * Remove a workflow from the activation queue */ - removeQueuedWorkflowActivation(workflowId: string): void { - if (this.queuedWorkflowActivations[workflowId]) { - clearTimeout(this.queuedWorkflowActivations[workflowId].timeout); - delete this.queuedWorkflowActivations[workflowId]; + removeQueuedWorkflowActivation(workflowId: string) { + if (this.queuedActivations[workflowId]) { + clearTimeout(this.queuedActivations[workflowId].timeout); + delete this.queuedActivations[workflowId]; } } /** * Remove all workflows from the activation queue */ - removeAllQueuedWorkflowActivations(): void { - for (const workflowId in this.queuedWorkflowActivations) { + removeAllQueuedWorkflowActivations() { + for (const workflowId in this.queuedActivations) { this.removeQueuedWorkflowActivation(workflowId); } } @@ -884,10 +938,10 @@ export class ActiveWorkflowRunner implements IWebhookManager { * @param {string} workflowId The id of the workflow to deactivate */ // TODO: this should happen in a transaction - async remove(workflowId: string): Promise { + async remove(workflowId: string) { // Remove all the webhooks of the workflow try { - await this.removeWorkflowWebhooks(workflowId); + await this.clearWebhooks(workflowId); } catch (error) { ErrorReporter.error(error); this.logger.error( @@ -900,7 +954,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { delete this.activationErrors[workflowId]; } - if (this.queuedWorkflowActivations[workflowId] !== undefined) { + if (this.queuedActivations[workflowId] !== undefined) { this.removeQueuedWorkflowActivation(workflowId); } @@ -913,4 +967,52 @@ export class ActiveWorkflowRunner implements IWebhookManager { } } } + + /** + * Register as active in memory a trigger- or poller-based workflow. + */ + async addTriggersAndPollers( + dbWorkflow: WorkflowEntity, + workflow: Workflow, + { + activationMode, + executionMode, + additionalData, + }: { + activationMode: WorkflowActivateMode; + executionMode: WorkflowExecuteMode; + additionalData: IWorkflowExecuteAdditionalDataWorkflow; + }, + ) { + const getTriggerFunctions = this.getExecuteTriggerFunctions( + dbWorkflow, + additionalData, + executionMode, + activationMode, + ); + + const getPollFunctions = this.getExecutePollFunctions( + dbWorkflow, + additionalData, + executionMode, + activationMode, + ); + + if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) { + await this.activeWorkflows.add( + workflow.id, + workflow, + additionalData, + executionMode, + activationMode, + getTriggerFunctions, + getPollFunctions, + ); + + this.logger.verbose(`Workflow ${dbWorkflow.display()} activated`, { + workflowId: dbWorkflow.id, + workflowName: dbWorkflow.name, + }); + } + } } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index f5838df115..0cdcac07df 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -16,7 +16,6 @@ import type { IWorkflowBase, CredentialLoadingDetails, Workflow, - WorkflowActivateMode, WorkflowExecuteMode, ExecutionStatus, IExecutionsSummary, @@ -64,20 +63,6 @@ import type { import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants'; import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types'; -export interface IActivationError { - time: number; - error: { - message: string; - }; -} - -export interface IQueuedWorkflowActivations { - activationMode: WorkflowActivateMode; - lastTimeout: number; - timeout: NodeJS.Timeout; - workflowData: IWorkflowDb; -} - export interface ICredentialsTypeData { [key: string]: CredentialLoadingDetails; } diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index f94e5e088b..33a81e76d8 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -620,7 +620,7 @@ export class Server extends AbstractServer { this.app.get( `/${this.restEndpoint}/active`, ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => { - return this.activeWorkflowRunner.getActiveWorkflows(req.user); + return this.activeWorkflowRunner.allActiveInStorage(req.user); }), ); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 097e069de8..432135f8fe 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -119,6 +119,8 @@ export class Start extends BaseCommand { '@/services/orchestration/main/MultiMainInstance.publisher.ee' ); + await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); + await Container.get(MultiMainInstancePublisher).destroy(); } @@ -251,6 +253,15 @@ export class Start extends BaseCommand { } await Container.get(OrchestrationHandlerMainService).init(); + + multiMainInstancePublisher.on('leadershipChange', async () => { + if (multiMainInstancePublisher.isLeader) { + await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); + } else { + // only in case of leadership change without shutdown + await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); + } + }); } async run() { diff --git a/packages/cli/src/databases/entities/WorkflowEntity.ts b/packages/cli/src/databases/entities/WorkflowEntity.ts index 5b0f5f29eb..332b71ecd5 100644 --- a/packages/cli/src/databases/entities/WorkflowEntity.ts +++ b/packages/cli/src/databases/entities/WorkflowEntity.ts @@ -89,6 +89,10 @@ export class WorkflowEntity extends WithTimestampsAndStringId implements IWorkfl @Column({ default: 0 }) triggerCount: number; + + display() { + return `"${this.name}" (ID: ${this.id})`; + } } /** diff --git a/packages/cli/src/databases/repositories/workflow.repository.ts b/packages/cli/src/databases/repositories/workflow.repository.ts index 5085f47be4..20a937f0eb 100644 --- a/packages/cli/src/databases/repositories/workflow.repository.ts +++ b/packages/cli/src/databases/repositories/workflow.repository.ts @@ -7,4 +7,18 @@ export class WorkflowRepository extends Repository { constructor(dataSource: DataSource) { super(WorkflowEntity, dataSource.manager); } + + async getAllActive() { + return this.find({ + where: { active: true }, + relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'], + }); + } + + async findById(workflowId: string) { + return this.findOne({ + where: { id: workflowId }, + relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'], + }); + } } diff --git a/packages/cli/src/services/orchestration.base.service.ts b/packages/cli/src/services/orchestration.base.service.ts index d81e9ca05c..113298e786 100644 --- a/packages/cli/src/services/orchestration.base.service.ts +++ b/packages/cli/src/services/orchestration.base.service.ts @@ -2,8 +2,9 @@ import Container from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; import config from '@/config'; +import { EventEmitter } from 'node:events'; -export abstract class OrchestrationService { +export abstract class OrchestrationService extends EventEmitter { protected initialized = false; protected queueModeId: string; @@ -29,6 +30,7 @@ export abstract class OrchestrationService { } constructor() { + super(); this.redisService = Container.get(RedisService); this.queueModeId = config.getEnv('redis.queueModeId'); } diff --git a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts index 62c92c6e97..60ef60fda9 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts @@ -13,11 +13,11 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher { private leaderId: string | undefined; - public get isLeader() { + get isLeader() { return this.id === this.leaderId; } - public get isFollower() { + get isFollower() { return !this.isLeader; } @@ -84,6 +84,8 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher { this.leaderId = this.id; + this.emit('leadershipChange', this.id); + await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); } } diff --git a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts new file mode 100644 index 0000000000..3b01d57b8d --- /dev/null +++ b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts @@ -0,0 +1,356 @@ +import { Container } from 'typedi'; + +import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; +import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow'; +import { ActiveWorkflows } from 'n8n-core'; + +import { ActiveExecutions } from '@/ActiveExecutions'; +import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; +import config from '@/config'; +import { ExternalHooks } from '@/ExternalHooks'; +import { Push } from '@/push'; +import { SecretsHelper } from '@/SecretsHelpers'; +import { WebhookService } from '@/services/webhook.service'; +import * as WebhookHelpers from '@/WebhookHelpers'; +import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; +import { WorkflowRunner } from '@/WorkflowRunner'; + +import { mockInstance, setSchedulerAsLoadedNode } from './shared/utils'; +import * as testDb from './shared/testDb'; +import type { User } from '@/databases/entities/User'; +import type { WebhookEntity } from '@/databases/entities/WebhookEntity'; +import { NodeTypes } from '@/NodeTypes'; +import { chooseRandomly } from './shared/random'; +import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee'; + +mockInstance(ActiveExecutions); +mockInstance(ActiveWorkflows); +mockInstance(Push); +mockInstance(SecretsHelper); +mockInstance(MultiMainInstancePublisher); + +const webhookService = mockInstance(WebhookService); + +setSchedulerAsLoadedNode(); + +const externalHooks = mockInstance(ExternalHooks); + +let activeWorkflowRunner: ActiveWorkflowRunner; +let owner: User; + +const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [ + 'init', + 'create', + 'update', + 'activate', + 'manual', +]; + +beforeAll(async () => { + await testDb.init(); + + activeWorkflowRunner = Container.get(ActiveWorkflowRunner); + owner = await testDb.createOwner(); +}); + +afterEach(async () => { + await activeWorkflowRunner.removeAll(); + activeWorkflowRunner.removeAllQueuedWorkflowActivations(); + await testDb.truncate(['Workflow']); + config.load(config.default); + jest.restoreAllMocks(); +}); + +afterAll(async () => { + await testDb.terminate(); +}); + +describe('init()', () => { + test('should call `ExternalHooks.run()`', async () => { + const runSpy = jest.spyOn(externalHooks, 'run'); + + await activeWorkflowRunner.init(); + + expect(runSpy).toHaveBeenCalledTimes(1); + const [hook, arg] = runSpy.mock.calls[0]; + expect(hook).toBe('activeWorkflows.initialized'); + expect(arg).toBeEmptyArray(); + }); + + test('should start with no active workflows', async () => { + await activeWorkflowRunner.init(); + + const inStorage = activeWorkflowRunner.allActiveInStorage(); + await expect(inStorage).resolves.toHaveLength(0); + + const inMemory = activeWorkflowRunner.allActiveInMemory(); + expect(inMemory).toHaveLength(0); + }); + + test('should start with one active workflow', async () => { + await testDb.createWorkflow({ active: true }, owner); + + await activeWorkflowRunner.init(); + + const inStorage = activeWorkflowRunner.allActiveInStorage(); + await expect(inStorage).resolves.toHaveLength(1); + + const inMemory = activeWorkflowRunner.allActiveInMemory(); + expect(inMemory).toHaveLength(1); + }); + + test('should start with multiple active workflows', async () => { + await testDb.createWorkflow({ active: true }, owner); + await testDb.createWorkflow({ active: true }, owner); + + await activeWorkflowRunner.init(); + + const inStorage = activeWorkflowRunner.allActiveInStorage(); + await expect(inStorage).resolves.toHaveLength(2); + + const inMemory = activeWorkflowRunner.allActiveInMemory(); + expect(inMemory).toHaveLength(2); + }); + + test('should pre-check that every workflow can be activated', async () => { + await testDb.createWorkflow({ active: true }, owner); + await testDb.createWorkflow({ active: true }, owner); + + const precheckSpy = jest + .spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated') + .mockReturnValue(true); + + await activeWorkflowRunner.init(); + + expect(precheckSpy).toHaveBeenCalledTimes(2); + }); +}); + +describe('removeAll()', () => { + test('should remove all active workflows from memory', async () => { + await testDb.createWorkflow({ active: true }, owner); + await testDb.createWorkflow({ active: true }, owner); + + await activeWorkflowRunner.init(); + await activeWorkflowRunner.removeAll(); + + const inMemory = activeWorkflowRunner.allActiveInMemory(); + expect(inMemory).toHaveLength(0); + }); +}); + +describe('remove()', () => { + test('should call `ActiveWorkflowRunner.clearWebhooks()`', async () => { + const workflow = await testDb.createWorkflow({ active: true }, owner); + const clearWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'clearWebhooks'); + + await activeWorkflowRunner.init(); + await activeWorkflowRunner.remove(workflow.id); + + expect(clearWebhooksSpy).toHaveBeenCalledTimes(1); + }); +}); + +describe('isActive()', () => { + test('should return `true` for active workflow in storage', async () => { + const workflow = await testDb.createWorkflow({ active: true }, owner); + + await activeWorkflowRunner.init(); + + const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id); + await expect(isActiveInStorage).resolves.toBe(true); + }); + + test('should return `false` for inactive workflow in storage', async () => { + const workflow = await testDb.createWorkflow({ active: false }, owner); + + await activeWorkflowRunner.init(); + + const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id); + await expect(isActiveInStorage).resolves.toBe(false); + }); +}); + +describe('runWorkflow()', () => { + test('should call `WorkflowRunner.run()`', async () => { + const workflow = await testDb.createWorkflow({ active: true }, owner); + + await activeWorkflowRunner.init(); + + const additionalData = await AdditionalData.getBase('fake-user-id'); + + const runSpy = jest + .spyOn(WorkflowRunner.prototype, 'run') + .mockResolvedValue('fake-execution-id'); + + const [node] = workflow.nodes; + + await activeWorkflowRunner.runWorkflow(workflow, node, [[]], additionalData, 'trigger'); + + expect(runSpy).toHaveBeenCalledTimes(1); + }); +}); + +describe('executeErrorWorkflow()', () => { + test('should call `WorkflowExecuteAdditionalData.executeErrorWorkflow()`', async () => { + const workflow = await testDb.createWorkflow({ active: true }, owner); + const [node] = workflow.nodes; + const error = new NodeOperationError(node, 'Fake error message'); + const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow'); + + await activeWorkflowRunner.init(); + activeWorkflowRunner.executeErrorWorkflow(error, workflow, 'trigger'); + + expect(executeSpy).toHaveBeenCalledTimes(1); + }); + + test('should be called on failure to activate due to 401', async () => { + const storedWorkflow = await testDb.createWorkflow({ active: true }, owner); + const [node] = storedWorkflow.nodes; + const executeSpy = jest.spyOn(activeWorkflowRunner, 'executeErrorWorkflow'); + + jest.spyOn(activeWorkflowRunner, 'add').mockImplementation(() => { + throw new NodeApiError(node, { + httpCode: '401', + message: 'Authorization failed - please check your credentials', + }); + }); + + await activeWorkflowRunner.init(); + + expect(executeSpy).toHaveBeenCalledTimes(1); + const [error, workflow] = executeSpy.mock.calls[0]; + expect(error.message).toContain('Authorization'); + expect(workflow.id).toBe(storedWorkflow.id); + }); +}); + +describe('add()', () => { + describe('in single-main scenario', () => { + test('leader should add webhooks, triggers and pollers', async () => { + const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); + + const workflow = await testDb.createWorkflow({ active: true }, owner); + + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + + await activeWorkflowRunner.init(); + + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); + + await activeWorkflowRunner.add(workflow.id, mode); + + expect(addWebhooksSpy).toHaveBeenCalledTimes(1); + expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe('in multi-main scenario', () => { + describe('leader', () => { + test('on regular activation mode, leader should add webhooks only', async () => { + const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); + + jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); + + mockInstance(MultiMainInstancePublisher, { isLeader: true }); + + const workflow = await testDb.createWorkflow({ active: true }, owner); + + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + + await activeWorkflowRunner.init(); + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); + + await activeWorkflowRunner.add(workflow.id, mode); + + expect(addWebhooksSpy).toHaveBeenCalledTimes(1); + expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + }); + + test('on activation via leadership change, leader should add triggers and pollers only', async () => { + const mode = 'leadershipChange'; + + jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); + + mockInstance(MultiMainInstancePublisher, { isLeader: true }); + + const workflow = await testDb.createWorkflow({ active: true }, owner); + + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + + await activeWorkflowRunner.init(); + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); + + await activeWorkflowRunner.add(workflow.id, mode); + + expect(addWebhooksSpy).not.toHaveBeenCalled(); + expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe('follower', () => { + test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => { + const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); + + jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); + + mockInstance(MultiMainInstancePublisher, { isLeader: false }); + + const workflow = await testDb.createWorkflow({ active: true }, owner); + + const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); + + await activeWorkflowRunner.init(); + addWebhooksSpy.mockReset(); + addTriggersAndPollersSpy.mockReset(); + + await activeWorkflowRunner.add(workflow.id, mode); + + expect(addWebhooksSpy).not.toHaveBeenCalled(); + expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); + }); + }); + }); +}); + +describe('addWebhooks()', () => { + test('should call `WebhookService.storeWebhook()`', async () => { + const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData; + const mockWebhookEntity = { webhookPath: 'fake-path' } as unknown as WebhookEntity; + + jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([mockWebhook]); + webhookService.createWebhook.mockReturnValue(mockWebhookEntity); + + const additionalData = await AdditionalData.getBase('fake-user-id'); + + const dbWorkflow = await testDb.createWorkflow({ active: true }, owner); + + const workflow = new Workflow({ + id: dbWorkflow.id, + name: dbWorkflow.name, + nodes: dbWorkflow.nodes, + connections: dbWorkflow.connections, + active: dbWorkflow.active, + nodeTypes: Container.get(NodeTypes), + staticData: dbWorkflow.staticData, + settings: dbWorkflow.settings, + }); + + const [node] = dbWorkflow.nodes; + + jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node); + jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true); + jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined); + + await activeWorkflowRunner.addWebhooks(workflow, additionalData, 'trigger', 'init'); + + expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/cli/test/integration/shared/testDb.ts b/packages/cli/test/integration/shared/testDb.ts index 9ef0e00c0f..7330b9c8e6 100644 --- a/packages/cli/test/integration/shared/testDb.ts +++ b/packages/cli/test/integration/shared/testDb.ts @@ -459,10 +459,10 @@ export async function createWorkflow(attributes: Partial = {}, u nodes: nodes ?? [ { id: 'uuid-1234', - name: 'Start', + name: 'Schedule Trigger', parameters: {}, position: [-20, 260], - type: 'n8n-nodes-base.start', + type: 'n8n-nodes-base.scheduleTrigger', typeVersion: 1, }, ], diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index f4265e59e9..ac45600e69 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -1,6 +1,6 @@ import { Container } from 'typedi'; import { BinaryDataService } from 'n8n-core'; -import type { INode } from 'n8n-workflow'; +import { type INode } from 'n8n-workflow'; import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials'; import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials'; import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node'; @@ -16,6 +16,8 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { AUTH_COOKIE_NAME } from '@/constants'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; +import { mockInstance } from './mocking'; +import { mockNodeTypesData } from '../../../unit/Helpers'; export { mockInstance } from './mocking'; export { setupTestServer } from './testServer'; @@ -166,3 +168,15 @@ export function makeWorkflow(options?: { } export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] }; + +export function setSchedulerAsLoadedNode() { + const nodesAndCredentials = mockInstance(LoadNodesAndCredentials); + + Object.assign(nodesAndCredentials, { + loadedNodes: mockNodeTypesData(['scheduleTrigger'], { + addTrigger: true, + }), + known: { nodes: {}, credentials: {} }, + types: { nodes: [], credentials: [] }, + }); +} diff --git a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts deleted file mode 100644 index 06527984fa..0000000000 --- a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts +++ /dev/null @@ -1,278 +0,0 @@ -import { v4 as uuid } from 'uuid'; -import { mocked } from 'jest-mock'; -import { Container } from 'typedi'; - -import type { INode } from 'n8n-workflow'; -import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; - -import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import * as Db from '@/Db'; -import { WorkflowEntity } from '@db/entities/WorkflowEntity'; -import { SharedWorkflow } from '@db/entities/SharedWorkflow'; -import { Role } from '@db/entities/Role'; -import { User } from '@db/entities/User'; -import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; -import { WorkflowRunner } from '@/WorkflowRunner'; -import { ExternalHooks } from '@/ExternalHooks'; -import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import { Push } from '@/push'; -import { ActiveExecutions } from '@/ActiveExecutions'; -import { SecretsHelper } from '@/SecretsHelpers'; -import { WebhookService } from '@/services/webhook.service'; -import { VariablesService } from '@/environments/variables/variables.service'; - -import { mockInstance } from '../integration/shared/utils/'; -import { randomEmail, randomName } from '../integration/shared/random'; -import * as Helpers from './Helpers'; - -/** - * TODO: - * - test workflow webhooks activation (that trigger `executeWebhook`and other webhook methods) - * - test activation error catching and getters such as `getActivationError` (requires building a workflow that fails to activate) - * - test queued workflow activation functions (might need to create a non-working workflow to test this) - */ - -let databaseActiveWorkflowsCount = 0; -let databaseActiveWorkflowsList: WorkflowEntity[] = []; - -const generateWorkflows = (count: number): WorkflowEntity[] => { - const workflows: WorkflowEntity[] = []; - const ownerRole = new Role(); - ownerRole.scope = 'workflow'; - ownerRole.name = 'owner'; - ownerRole.id = '1'; - - const owner = new User(); - owner.id = uuid(); - owner.firstName = randomName(); - owner.lastName = randomName(); - owner.email = randomEmail(); - - for (let i = 0; i < count; i++) { - const workflow = new WorkflowEntity(); - Object.assign(workflow, { - id: (i + 1).toString(), - name: randomName(), - active: true, - createdAt: new Date(), - updatedAt: new Date(), - nodes: [ - { - parameters: { - rule: { - interval: [{}], - }, - }, - id: uuid(), - name: 'Schedule Trigger', - type: 'n8n-nodes-base.scheduleTrigger', - typeVersion: 1, - position: [900, 460], - }, - ], - connections: {}, - tags: [], - }); - const sharedWorkflow = new SharedWorkflow(); - sharedWorkflow.workflowId = workflow.id; - sharedWorkflow.role = ownerRole; - sharedWorkflow.user = owner; - - workflow.shared = [sharedWorkflow]; - - workflows.push(workflow); - } - databaseActiveWorkflowsList = workflows; - return workflows; -}; - -const MOCK_NODE_TYPES_DATA = Helpers.mockNodeTypesData(['scheduleTrigger'], { - addTrigger: true, -}); - -jest.mock('@/Db', () => { - return { - collections: { - Workflow: { - find: jest.fn(async () => generateWorkflows(databaseActiveWorkflowsCount)), - findOne: jest.fn(async (searchParams) => { - return databaseActiveWorkflowsList.find( - (workflow) => workflow.id === searchParams.where.id.toString(), - ); - }), - update: jest.fn(), - createQueryBuilder: jest.fn(() => { - const fakeQueryBuilder = { - update: () => fakeQueryBuilder, - set: () => fakeQueryBuilder, - where: () => fakeQueryBuilder, - execute: async () => {}, - }; - return fakeQueryBuilder; - }), - }, - }, - }; -}); - -const workflowCheckIfCanBeActivated = jest.fn(() => true); - -jest - .spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated') - .mockImplementation(workflowCheckIfCanBeActivated); - -const removeFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'remove'); -const removeWebhooksFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'removeWorkflowWebhooks'); -const workflowRunnerRun = jest.spyOn(WorkflowRunner.prototype, 'run'); -const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn( - WorkflowExecuteAdditionalData, - 'executeErrorWorkflow', -); - -describe('ActiveWorkflowRunner', () => { - mockInstance(ActiveExecutions); - const externalHooks = mockInstance(ExternalHooks); - const webhookService = mockInstance(WebhookService); - mockInstance(Push); - mockInstance(SecretsHelper); - const variablesService = mockInstance(VariablesService); - const nodesAndCredentials = mockInstance(LoadNodesAndCredentials); - Object.assign(nodesAndCredentials, { - loadedNodes: MOCK_NODE_TYPES_DATA, - known: { nodes: {}, credentials: {} }, - types: { nodes: [], credentials: [] }, - }); - - const activeWorkflowRunner = Container.get(ActiveWorkflowRunner); - - beforeAll(async () => { - variablesService.getAllCached.mockResolvedValue([]); - }); - - afterEach(async () => { - await activeWorkflowRunner.removeAll(); - databaseActiveWorkflowsCount = 0; - databaseActiveWorkflowsList = []; - jest.clearAllMocks(); - }); - - test('Should initialize activeWorkflowRunner with empty list of active workflows and call External Hooks', async () => { - await activeWorkflowRunner.init(); - expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0); - expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled(); - expect(externalHooks.run).toHaveBeenCalledTimes(1); - }); - - test('Should initialize activeWorkflowRunner with one active workflow', async () => { - databaseActiveWorkflowsCount = 1; - await activeWorkflowRunner.init(); - expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength( - databaseActiveWorkflowsCount, - ); - expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled(); - expect(externalHooks.run).toHaveBeenCalled(); - }); - - test('Should make sure function checkIfWorkflowCanBeActivated was called for every workflow', async () => { - databaseActiveWorkflowsCount = 2; - await activeWorkflowRunner.init(); - expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(databaseActiveWorkflowsCount); - }); - - test('Call to removeAll should remove every workflow', async () => { - databaseActiveWorkflowsCount = 2; - await activeWorkflowRunner.init(); - expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength( - databaseActiveWorkflowsCount, - ); - await activeWorkflowRunner.removeAll(); - expect(removeFunction).toHaveBeenCalledTimes(databaseActiveWorkflowsCount); - }); - - test('Call to remove should also call removeWorkflowWebhooks', async () => { - databaseActiveWorkflowsCount = 1; - await activeWorkflowRunner.init(); - expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength( - databaseActiveWorkflowsCount, - ); - await activeWorkflowRunner.remove('1'); - expect(removeWebhooksFunction).toHaveBeenCalledTimes(1); - }); - - test('Call to isActive should return true for valid workflow', async () => { - databaseActiveWorkflowsCount = 1; - await activeWorkflowRunner.init(); - expect(await activeWorkflowRunner.isActive('1')).toBe(true); - }); - - test('Call to isActive should return false for invalid workflow', async () => { - databaseActiveWorkflowsCount = 1; - await activeWorkflowRunner.init(); - expect(await activeWorkflowRunner.isActive('2')).toBe(false); - }); - - test('Calling add should call checkIfWorkflowCanBeActivated', async () => { - // Initialize with default (0) workflows - await activeWorkflowRunner.init(); - generateWorkflows(1); - await activeWorkflowRunner.add('1', 'activate'); - expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(1); - }); - - test('runWorkflow should call run method in WorkflowRunner', async () => { - await activeWorkflowRunner.init(); - const workflow = generateWorkflows(1); - const additionalData = await WorkflowExecuteAdditionalData.getBase('fake-user-id'); - - workflowRunnerRun.mockResolvedValueOnce('invalid-execution-id'); - - await activeWorkflowRunner.runWorkflow( - workflow[0], - workflow[0].nodes[0], - [[]], - additionalData, - 'trigger', - ); - - expect(workflowRunnerRun).toHaveBeenCalledTimes(1); - }); - - test('executeErrorWorkflow should call function with same name in WorkflowExecuteAdditionalData', async () => { - const workflowData = generateWorkflows(1)[0]; - const error = new NodeOperationError(workflowData.nodes[0], 'Fake error message'); - await activeWorkflowRunner.init(); - activeWorkflowRunner.executeErrorWorkflow(error, workflowData, 'trigger'); - expect(workflowExecuteAdditionalDataExecuteErrorWorkflowSpy).toHaveBeenCalledTimes(1); - }); - - describe('init()', () => { - it('should execute error workflow on failure to activate due to 401', async () => { - databaseActiveWorkflowsCount = 1; - - jest.spyOn(ActiveWorkflowRunner.prototype, 'add').mockImplementation(() => { - throw new NodeApiError( - { - id: 'a75dcd1b-9fed-4643-90bd-75933d67936c', - name: 'Github Trigger', - type: 'n8n-nodes-base.githubTrigger', - typeVersion: 1, - position: [0, 0], - } as INode, - { - httpCode: '401', - message: 'Authorization failed - please check your credentials', - }, - ); - }); - - const executeSpy = jest.spyOn(ActiveWorkflowRunner.prototype, 'executeErrorWorkflow'); - - await activeWorkflowRunner.init(); - - const [error, workflow] = executeSpy.mock.calls[0]; - - expect(error.message).toContain('Authorization'); - expect(workflow.id).toBe('1'); - }); - }); -}); diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index 5218d2c885..74f25ccfb5 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -1,5 +1,3 @@ -/* eslint-disable @typescript-eslint/no-unsafe-argument */ - import { CronJob } from 'cron'; import type { @@ -14,62 +12,64 @@ import type { WorkflowActivateMode, WorkflowExecuteMode, } from 'n8n-workflow'; -import { LoggerProxy as Logger, toCronExpression, WorkflowActivationError } from 'n8n-workflow'; +import { + LoggerProxy as Logger, + toCronExpression, + WorkflowActivationError, + WorkflowDeactivationError, +} from 'n8n-workflow'; import type { IWorkflowData } from './Interfaces'; export class ActiveWorkflows { - private workflowData: { - [key: string]: IWorkflowData; + private activeWorkflows: { + [workflowId: string]: IWorkflowData; } = {}; /** - * Returns if the workflow is active - * - * @param {string} id The id of the workflow to check + * Returns if the workflow is active in memory. */ - isActive(id: string): boolean { - return this.workflowData.hasOwnProperty(id); + isActive(workflowId: string) { + return this.activeWorkflows.hasOwnProperty(workflowId); } /** - * Returns the ids of the currently active workflows - * + * Returns the IDs of the currently active workflows in memory. */ - allActiveWorkflows(): string[] { - return Object.keys(this.workflowData); + allActiveWorkflows() { + return Object.keys(this.activeWorkflows); } /** - * Returns the Workflow data for the workflow with - * the given id if it is currently active - * + * Returns the workflow data for the given ID if currently active in memory. */ - get(id: string): IWorkflowData | undefined { - return this.workflowData[id]; + get(workflowId: string) { + return this.activeWorkflows[workflowId]; } /** * Makes a workflow active * - * @param {string} id The id of the workflow to activate + * @param {string} workflowId The id of the workflow to activate * @param {Workflow} workflow The workflow to activate * @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows */ async add( - id: string, + workflowId: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, getTriggerFunctions: IGetExecuteTriggerFunctions, getPollFunctions: IGetExecutePollFunctions, - ): Promise { - this.workflowData[id] = {}; + ) { + this.activeWorkflows[workflowId] = {}; const triggerNodes = workflow.getTriggerNodes(); let triggerResponse: ITriggerResponse | undefined; - this.workflowData[id].triggerResponses = []; + + this.activeWorkflows[workflowId].triggerResponses = []; + for (const triggerNode of triggerNodes) { try { triggerResponse = await workflow.runTrigger( @@ -82,46 +82,49 @@ export class ActiveWorkflows { if (triggerResponse !== undefined) { // If a response was given save it - this.workflowData[id].triggerResponses!.push(triggerResponse); + this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse); } - } catch (error) { + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + throw new WorkflowActivationError( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access `There was a problem activating the workflow: "${error.message}"`, - { cause: error as Error, node: triggerNode }, + { cause: error, node: triggerNode }, ); } } - const pollNodes = workflow.getPollNodes(); - if (pollNodes.length) { - this.workflowData[id].pollResponses = []; - for (const pollNode of pollNodes) { - try { - this.workflowData[id].pollResponses!.push( - await this.activatePolling( - pollNode, - workflow, - additionalData, - getPollFunctions, - mode, - activation, - ), - ); - } catch (error) { - throw new WorkflowActivationError( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem activating the workflow: "${error.message}"`, - { cause: error as Error, node: pollNode }, - ); - } + const pollingNodes = workflow.getPollNodes(); + + if (pollingNodes.length === 0) return; + + this.activeWorkflows[workflowId].pollResponses = []; + + for (const pollNode of pollingNodes) { + try { + this.activeWorkflows[workflowId].pollResponses!.push( + await this.activatePolling( + pollNode, + workflow, + additionalData, + getPollFunctions, + mode, + activation, + ), + ); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + throw new WorkflowActivationError( + `There was a problem activating the workflow: "${error.message}"`, + { cause: error, node: pollNode }, + ); } } } /** * Activates polling for the given node - * */ async activatePolling( node: INode, @@ -159,7 +162,7 @@ export class ActiveWorkflows { if (testingTrigger) { throw error; } - pollFunctions.__emitError(error); + pollFunctions.__emitError(error as Error); } }; @@ -193,59 +196,49 @@ export class ActiveWorkflows { } /** - * Makes a workflow inactive - * - * @param {string} id The id of the workflow to deactivate + * Makes a workflow inactive in memory. */ - async remove(id: string): Promise { - if (!this.isActive(id)) { - // Workflow is currently not registered - Logger.warn( - `The workflow with the id "${id}" is currently not active and can so not be removed`, - ); + async remove(workflowId: string) { + if (!this.isActive(workflowId)) { + Logger.warn(`Cannot deactivate already inactive workflow ID "${workflowId}"`); return false; } - const workflowData = this.workflowData[id]; + const w = this.activeWorkflows[workflowId]; - if (workflowData.triggerResponses) { - for (const triggerResponse of workflowData.triggerResponses) { - if (triggerResponse.closeFunction) { - try { - await triggerResponse.closeFunction(); - } catch (error) { - Logger.error( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem deactivating trigger of workflow "${id}": "${error.message}"`, - { - workflowId: id, - }, - ); - } - } - } - } + w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger')); + w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller')); - if (workflowData.pollResponses) { - for (const pollResponse of workflowData.pollResponses) { - if (pollResponse.closeFunction) { - try { - await pollResponse.closeFunction(); - } catch (error) { - Logger.error( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem deactivating polling trigger of workflow "${id}": "${error.message}"`, - { - workflowId: id, - }, - ); - } - } - } - } - - delete this.workflowData[id]; + delete this.activeWorkflows[workflowId]; return true; } + + async removeAllTriggerAndPollerBasedWorkflows() { + for (const workflowId of Object.keys(this.activeWorkflows)) { + const w = this.activeWorkflows[workflowId]; + + w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger')); + w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller')); + } + } + + private async close( + response: ITriggerResponse | IPollResponse, + workflowId: string, + target: 'trigger' | 'poller', + ) { + if (!response.closeFunction) return; + + try { + await response.closeFunction(); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + throw new WorkflowDeactivationError( + `Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`, + { cause: error, workflowId }, + ); + } + } } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 0fc46dd988..a8ea204546 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1905,7 +1905,14 @@ export type WorkflowExecuteMode = | 'retry' | 'trigger' | 'webhook'; -export type WorkflowActivateMode = 'init' | 'create' | 'update' | 'activate' | 'manual'; + +export type WorkflowActivateMode = + | 'init' + | 'create' + | 'update' + | 'activate' + | 'manual' + | 'leadershipChange'; export interface IWorkflowHooksOptionalParameters { parentProcessMode?: string; diff --git a/packages/workflow/src/WorkflowActivationError.ts b/packages/workflow/src/WorkflowActivationError.ts index d9a7943a99..3b664b21bb 100644 --- a/packages/workflow/src/WorkflowActivationError.ts +++ b/packages/workflow/src/WorkflowActivationError.ts @@ -5,6 +5,7 @@ interface WorkflowActivationErrorOptions { cause?: Error; node?: INode; severity?: Severity; + workflowId?: string; } /** @@ -13,7 +14,12 @@ interface WorkflowActivationErrorOptions { export class WorkflowActivationError extends ExecutionBaseError { node: INode | undefined; - constructor(message: string, { cause, node, severity }: WorkflowActivationErrorOptions) { + workflowId: string | undefined; + + constructor( + message: string, + { cause, node, severity, workflowId }: WorkflowActivationErrorOptions = {}, + ) { let error = cause as Error; if (cause instanceof ExecutionBaseError) { error = new Error(cause.message); @@ -23,11 +29,14 @@ export class WorkflowActivationError extends ExecutionBaseError { } super(message, { cause: error }); this.node = node; + this.workflowId = workflowId; this.message = message; if (severity) this.severity = severity; } } +export class WorkflowDeactivationError extends WorkflowActivationError {} + export class WebhookPathAlreadyTakenError extends WorkflowActivationError { constructor(nodeName: string, cause?: Error) { super(