diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index 55775f64b1..f501a636db 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -148,4 +148,21 @@ describe('ActiveWorkflowManager', () => { ); }); }); + + describe('addActiveWorkflows', () => { + test('should prevent concurrent activations', async () => { + const getAllActiveIds = jest.spyOn(workflowRepository, 'getAllActiveIds'); + + workflowRepository.getAllActiveIds.mockImplementation( + async () => await new Promise((resolve) => setTimeout(() => resolve(['workflow-1']), 50)), + ); + + await Promise.all([ + activeWorkflowManager.addActiveWorkflows('init'), + activeWorkflowManager.addActiveWorkflows('leadershipChange'), + ]); + + expect(getAllActiveIds).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 6ce6903a83..009116061e 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -418,30 +418,42 @@ export class ActiveWorkflowManager { executeErrorWorkflow(workflowData, fullRunData, mode); } + private isActivationInProgress = false; + /** * Register as active in memory all workflows stored as `active`, * only on instance init or (in multi-main setup) on leadership change. */ async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') { - const dbWorkflowIds = await this.workflowRepository.getAllActiveIds(); - - if (dbWorkflowIds.length === 0) return; - - if (this.instanceSettings.isLeader) { - this.logger.info('Start Active Workflows:'); + if (this.isActivationInProgress) { + this.logger.debug(`Skipping activation - already in progress for mode: ${activationMode}`); + return; } - const batches = chunk(dbWorkflowIds, this.workflowsConfig.activationBatchSize); + this.isActivationInProgress = true; + try { + const dbWorkflowIds = await this.workflowRepository.getAllActiveIds(); - for (const batch of batches) { - const activationPromises = batch.map(async (dbWorkflowId) => { - await this.activateWorkflow(dbWorkflowId, activationMode); - }); + if (dbWorkflowIds.length === 0) return; - await Promise.all(activationPromises); + if (this.instanceSettings.isLeader) { + this.logger.info('Start Active Workflows:'); + } + + const batches = chunk(dbWorkflowIds, this.workflowsConfig.activationBatchSize); + + for (const batch of batches) { + const activationPromises = batch.map(async (dbWorkflowId) => { + await this.activateWorkflow(dbWorkflowId, activationMode); + }); + + await Promise.all(activationPromises); + } + + this.logger.debug('Finished activating all workflows'); + } finally { + this.isActivationInProgress = false; } - - this.logger.debug('Finished activating all workflows'); } private async activateWorkflow(