mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 18:12:04 +00:00
refactor(core): Remove deprecated properties from orchestration service (#11251)
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import type { InstanceSettings } from 'n8n-core';
|
||||||
|
|
||||||
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import type { IExecutionResponse } from '@/interfaces';
|
import type { IExecutionResponse } from '@/interfaces';
|
||||||
@@ -13,6 +14,7 @@ describe('WaitTracker', () => {
|
|||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
const multiMainSetup = mock<MultiMainSetup>();
|
const multiMainSetup = mock<MultiMainSetup>();
|
||||||
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
|
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
|
||||||
|
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
|
||||||
|
|
||||||
const execution = mock<IExecutionResponse>({
|
const execution = mock<IExecutionResponse>({
|
||||||
id: '123',
|
id: '123',
|
||||||
@@ -27,6 +29,7 @@ describe('WaitTracker', () => {
|
|||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
orchestrationService,
|
orchestrationService,
|
||||||
|
instanceSettings,
|
||||||
);
|
);
|
||||||
multiMainSetup.on.mockReturnThis();
|
multiMainSetup.on.mockReturnThis();
|
||||||
});
|
});
|
||||||
@@ -37,7 +40,6 @@ describe('WaitTracker', () => {
|
|||||||
|
|
||||||
describe('init()', () => {
|
describe('init()', () => {
|
||||||
it('should query DB for waiting executions if leader', async () => {
|
it('should query DB for waiting executions if leader', async () => {
|
||||||
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
|
|
||||||
waitTracker.init();
|
waitTracker.init();
|
||||||
@@ -120,7 +122,6 @@ describe('WaitTracker', () => {
|
|||||||
|
|
||||||
describe('multi-main setup', () => {
|
describe('multi-main setup', () => {
|
||||||
it('should start tracking if leader', () => {
|
it('should start tracking if leader', () => {
|
||||||
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
|
|
||||||
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
@@ -131,7 +132,14 @@ describe('WaitTracker', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should not start tracking if follower', () => {
|
it('should not start tracking if follower', () => {
|
||||||
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
|
const waitTracker = new WaitTracker(
|
||||||
|
mockLogger(),
|
||||||
|
executionRepository,
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
orchestrationService,
|
||||||
|
mock<InstanceSettings>({ isLeader: false }),
|
||||||
|
);
|
||||||
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||||
|
|
||||||
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions } from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
ExecutionError,
|
ExecutionError,
|
||||||
IDeferredPromise,
|
IDeferredPromise,
|
||||||
@@ -74,6 +74,7 @@ export class ActiveWorkflowManager {
|
|||||||
private readonly workflowStaticDataService: WorkflowStaticDataService,
|
private readonly workflowStaticDataService: WorkflowStaticDataService,
|
||||||
private readonly activeWorkflowsService: ActiveWorkflowsService,
|
private readonly activeWorkflowsService: ActiveWorkflowsService,
|
||||||
private readonly workflowExecutionService: WorkflowExecutionService,
|
private readonly workflowExecutionService: WorkflowExecutionService,
|
||||||
|
private readonly instanceSettings: InstanceSettings,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
@@ -423,7 +424,7 @@ export class ActiveWorkflowManager {
|
|||||||
|
|
||||||
if (dbWorkflows.length === 0) return;
|
if (dbWorkflows.length === 0) return;
|
||||||
|
|
||||||
if (this.orchestrationService.isLeader) {
|
if (this.instanceSettings.isLeader) {
|
||||||
this.logger.info(' ================================');
|
this.logger.info(' ================================');
|
||||||
this.logger.info(' Start Active Workflows:');
|
this.logger.info(' Start Active Workflows:');
|
||||||
this.logger.info(' ================================');
|
this.logger.info(' ================================');
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
|
|
||||||
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
||||||
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||||
import { Get, RestController } from '@/decorators';
|
import { Get, RestController } from '@/decorators';
|
||||||
@@ -9,6 +11,7 @@ export class DebugController {
|
|||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly orchestrationService: OrchestrationService,
|
||||||
private readonly activeWorkflowManager: ActiveWorkflowManager,
|
private readonly activeWorkflowManager: ActiveWorkflowManager,
|
||||||
private readonly workflowRepository: WorkflowRepository,
|
private readonly workflowRepository: WorkflowRepository,
|
||||||
|
private readonly instanceSettings: InstanceSettings,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
@Get('/multi-main-setup', { skipAuth: true })
|
@Get('/multi-main-setup', { skipAuth: true })
|
||||||
@@ -24,9 +27,9 @@ export class DebugController {
|
|||||||
const activationErrors = await this.activeWorkflowManager.getAllWorkflowActivationErrors();
|
const activationErrors = await this.activeWorkflowManager.getAllWorkflowActivationErrors();
|
||||||
|
|
||||||
return {
|
return {
|
||||||
instanceId: this.orchestrationService.instanceId,
|
instanceId: this.instanceSettings.instanceId,
|
||||||
leaderKey,
|
leaderKey,
|
||||||
isLeader: this.orchestrationService.isLeader,
|
isLeader: this.instanceSettings.isLeader,
|
||||||
activeWorkflows: {
|
activeWorkflows: {
|
||||||
webhooks, // webhook-based active workflows
|
webhooks, // webhook-based active workflows
|
||||||
triggersAndPollers, // poller- and trigger-based active workflows
|
triggersAndPollers, // poller- and trigger-based active workflows
|
||||||
|
|||||||
@@ -143,10 +143,7 @@ export class License {
|
|||||||
|
|
||||||
this.orchestrationService.setMultiMainSetupLicensed(isMultiMainLicensed ?? false);
|
this.orchestrationService.setMultiMainSetupLicensed(isMultiMainLicensed ?? false);
|
||||||
|
|
||||||
if (
|
if (this.orchestrationService.isMultiMainSetupEnabled && this.instanceSettings.isFollower) {
|
||||||
this.orchestrationService.isMultiMainSetupEnabled &&
|
|
||||||
this.orchestrationService.isFollower
|
|
||||||
) {
|
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
'[Multi-main setup] Instance is follower, skipping sending of "reload-license" command...',
|
'[Multi-main setup] Instance is follower, skipping sending of "reload-license" command...',
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -47,16 +47,6 @@ export class OrchestrationService {
|
|||||||
return config.getEnv('redis.queueModeId');
|
return config.getEnv('redis.queueModeId');
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @deprecated use InstanceSettings.isLeader */
|
|
||||||
get isLeader() {
|
|
||||||
return this.instanceSettings.isLeader;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @deprecated use InstanceSettings.isFollower */
|
|
||||||
get isFollower() {
|
|
||||||
return this.instanceSettings.isFollower;
|
|
||||||
}
|
|
||||||
|
|
||||||
sanityCheck() {
|
sanityCheck() {
|
||||||
return this.isInitialized && config.get('executions.mode') === 'queue';
|
return this.isInitialized && config.get('executions.mode') === 'queue';
|
||||||
}
|
}
|
||||||
@@ -144,7 +134,7 @@ export class OrchestrationService {
|
|||||||
|
|
||||||
if (activationMode === 'leadershipChange') return false;
|
if (activationMode === 'leadershipChange') return false;
|
||||||
|
|
||||||
return this.isLeader; // 'update' or 'activate'
|
return this.instanceSettings.isLeader; // 'update' or 'activate'
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -154,6 +144,6 @@ export class OrchestrationService {
|
|||||||
* triggers and pollers in memory, to ensure they are not duplicated.
|
* triggers and pollers in memory, to ensure they are not duplicated.
|
||||||
*/
|
*/
|
||||||
shouldAddTriggersAndPollers() {
|
shouldAddTriggersAndPollers() {
|
||||||
return this.isLeader;
|
return this.instanceSettings.isLeader;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,7 +37,8 @@ export class PruningService {
|
|||||||
* @important Requires `OrchestrationService` to be initialized.
|
* @important Requires `OrchestrationService` to be initialized.
|
||||||
*/
|
*/
|
||||||
init() {
|
init() {
|
||||||
const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService;
|
const { isLeader } = this.instanceSettings;
|
||||||
|
const { isMultiMainSetupEnabled } = this.orchestrationService;
|
||||||
|
|
||||||
if (isLeader) this.startPruning();
|
if (isLeader) this.startPruning();
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import {
|
import {
|
||||||
ApplicationError,
|
ApplicationError,
|
||||||
ErrorReporterProxy as ErrorReporter,
|
ErrorReporterProxy as ErrorReporter,
|
||||||
@@ -28,6 +29,7 @@ export class WaitTracker {
|
|||||||
private readonly ownershipService: OwnershipService,
|
private readonly ownershipService: OwnershipService,
|
||||||
private readonly workflowRunner: WorkflowRunner,
|
private readonly workflowRunner: WorkflowRunner,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly orchestrationService: OrchestrationService,
|
||||||
|
private readonly instanceSettings: InstanceSettings,
|
||||||
) {
|
) {
|
||||||
this.logger = this.logger.withScope('executions');
|
this.logger = this.logger.withScope('executions');
|
||||||
}
|
}
|
||||||
@@ -40,7 +42,8 @@ export class WaitTracker {
|
|||||||
* @important Requires `OrchestrationService` to be initialized.
|
* @important Requires `OrchestrationService` to be initialized.
|
||||||
*/
|
*/
|
||||||
init() {
|
init() {
|
||||||
const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService;
|
const { isLeader } = this.instanceSettings;
|
||||||
|
const { isMultiMainSetupEnabled } = this.orchestrationService;
|
||||||
|
|
||||||
if (isLeader) this.startTracking();
|
if (isLeader) this.startTracking();
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
|
import Container from 'typedi';
|
||||||
|
|
||||||
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
||||||
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
|
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
|
||||||
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||||
import { generateNanoId } from '@/databases/utils/generators';
|
import { generateNanoId } from '@/databases/utils/generators';
|
||||||
import { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee';
|
import { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
import { createOwner } from './shared/db/users';
|
import { createOwner } from './shared/db/users';
|
||||||
import { randomName } from './shared/random';
|
import { randomName } from './shared/random';
|
||||||
@@ -14,6 +16,8 @@ import { mockInstance } from '../shared/mocking';
|
|||||||
describe('DebugController', () => {
|
describe('DebugController', () => {
|
||||||
const workflowRepository = mockInstance(WorkflowRepository);
|
const workflowRepository = mockInstance(WorkflowRepository);
|
||||||
const activeWorkflowManager = mockInstance(ActiveWorkflowManager);
|
const activeWorkflowManager = mockInstance(ActiveWorkflowManager);
|
||||||
|
const instanceSettings = Container.get(InstanceSettings);
|
||||||
|
instanceSettings.markAsLeader();
|
||||||
|
|
||||||
let testServer = setupTestServer({ endpointGroups: ['debug'] });
|
let testServer = setupTestServer({ endpointGroups: ['debug'] });
|
||||||
let ownerAgent: SuperAgentTest;
|
let ownerAgent: SuperAgentTest;
|
||||||
@@ -30,7 +34,7 @@ describe('DebugController', () => {
|
|||||||
const webhooks = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
|
const webhooks = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
|
||||||
const triggersAndPollers = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
|
const triggersAndPollers = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
|
||||||
const activationErrors = { [workflowId]: 'Failed to activate' };
|
const activationErrors = { [workflowId]: 'Failed to activate' };
|
||||||
const instanceId = 'main-71JdWtq306epIFki';
|
const { instanceId } = instanceSettings;
|
||||||
const leaderKey = 'some-leader-key';
|
const leaderKey = 'some-leader-key';
|
||||||
|
|
||||||
workflowRepository.findIn.mockResolvedValue(triggersAndPollers);
|
workflowRepository.findIn.mockResolvedValue(triggersAndPollers);
|
||||||
@@ -38,9 +42,7 @@ describe('DebugController', () => {
|
|||||||
activeWorkflowManager.allActiveInMemory.mockReturnValue([workflowId]);
|
activeWorkflowManager.allActiveInMemory.mockReturnValue([workflowId]);
|
||||||
activeWorkflowManager.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors);
|
activeWorkflowManager.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors);
|
||||||
|
|
||||||
jest.spyOn(OrchestrationService.prototype, 'instanceId', 'get').mockReturnValue(instanceId);
|
|
||||||
jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue(leaderKey);
|
jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue(leaderKey);
|
||||||
jest.spyOn(OrchestrationService.prototype, 'isLeader', 'get').mockReturnValue(true);
|
|
||||||
|
|
||||||
const response = await ownerAgent.get('/debug/multi-main-setup').expect(200);
|
const response = await ownerAgent.get('/debug/multi-main-setup').expect(200);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user