mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
fix(core): Prevent re-entry during workflow activation (#17965)
This commit is contained in:
@@ -148,4 +148,21 @@ describe('ActiveWorkflowManager', () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('addActiveWorkflows', () => {
|
||||||
|
test('should prevent concurrent activations', async () => {
|
||||||
|
const getAllActiveIds = jest.spyOn(workflowRepository, 'getAllActiveIds');
|
||||||
|
|
||||||
|
workflowRepository.getAllActiveIds.mockImplementation(
|
||||||
|
async () => await new Promise((resolve) => setTimeout(() => resolve(['workflow-1']), 50)),
|
||||||
|
);
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
activeWorkflowManager.addActiveWorkflows('init'),
|
||||||
|
activeWorkflowManager.addActiveWorkflows('leadershipChange'),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(getAllActiveIds).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -418,30 +418,42 @@ export class ActiveWorkflowManager {
|
|||||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private isActivationInProgress = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register as active in memory all workflows stored as `active`,
|
* Register as active in memory all workflows stored as `active`,
|
||||||
* only on instance init or (in multi-main setup) on leadership change.
|
* only on instance init or (in multi-main setup) on leadership change.
|
||||||
*/
|
*/
|
||||||
async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') {
|
async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') {
|
||||||
const dbWorkflowIds = await this.workflowRepository.getAllActiveIds();
|
if (this.isActivationInProgress) {
|
||||||
|
this.logger.debug(`Skipping activation - already in progress for mode: ${activationMode}`);
|
||||||
if (dbWorkflowIds.length === 0) return;
|
return;
|
||||||
|
|
||||||
if (this.instanceSettings.isLeader) {
|
|
||||||
this.logger.info('Start Active Workflows:');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const batches = chunk(dbWorkflowIds, this.workflowsConfig.activationBatchSize);
|
this.isActivationInProgress = true;
|
||||||
|
try {
|
||||||
|
const dbWorkflowIds = await this.workflowRepository.getAllActiveIds();
|
||||||
|
|
||||||
for (const batch of batches) {
|
if (dbWorkflowIds.length === 0) return;
|
||||||
const activationPromises = batch.map(async (dbWorkflowId) => {
|
|
||||||
await this.activateWorkflow(dbWorkflowId, activationMode);
|
|
||||||
});
|
|
||||||
|
|
||||||
await Promise.all(activationPromises);
|
if (this.instanceSettings.isLeader) {
|
||||||
|
this.logger.info('Start Active Workflows:');
|
||||||
|
}
|
||||||
|
|
||||||
|
const batches = chunk(dbWorkflowIds, this.workflowsConfig.activationBatchSize);
|
||||||
|
|
||||||
|
for (const batch of batches) {
|
||||||
|
const activationPromises = batch.map(async (dbWorkflowId) => {
|
||||||
|
await this.activateWorkflow(dbWorkflowId, activationMode);
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all(activationPromises);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.debug('Finished activating all workflows');
|
||||||
|
} finally {
|
||||||
|
this.isActivationInProgress = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug('Finished activating all workflows');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async activateWorkflow(
|
private async activateWorkflow(
|
||||||
|
|||||||
Reference in New Issue
Block a user