mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 01:56:46 +00:00
refactor(core): Improve workflow activation logs (#15180)
This commit is contained in:
@@ -17,6 +17,7 @@ export const LOG_SCOPES = [
|
|||||||
'waiting-executions',
|
'waiting-executions',
|
||||||
'task-runner',
|
'task-runner',
|
||||||
'insights',
|
'insights',
|
||||||
|
'workflow-activation',
|
||||||
] as const;
|
] as const;
|
||||||
|
|
||||||
export type LogScope = (typeof LOG_SCOPES)[number];
|
export type LogScope = (typeof LOG_SCOPES)[number];
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import { Workflow } from 'n8n-workflow';
|
|||||||
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
||||||
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||||
import type { NodeTypes } from '@/node-types';
|
import type { NodeTypes } from '@/node-types';
|
||||||
|
import { mockLogger } from '@test/mocking';
|
||||||
|
|
||||||
describe('ActiveWorkflowManager', () => {
|
describe('ActiveWorkflowManager', () => {
|
||||||
let activeWorkflowManager: ActiveWorkflowManager;
|
let activeWorkflowManager: ActiveWorkflowManager;
|
||||||
@@ -23,7 +24,7 @@ describe('ActiveWorkflowManager', () => {
|
|||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
activeWorkflowManager = new ActiveWorkflowManager(
|
activeWorkflowManager = new ActiveWorkflowManager(
|
||||||
mock(),
|
mockLogger(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -137,12 +138,12 @@ describe('ActiveWorkflowManager', () => {
|
|||||||
);
|
);
|
||||||
workflowRepository.findById.mockResolvedValue(mock<WorkflowEntity>({ active: false }));
|
workflowRepository.findById.mockResolvedValue(mock<WorkflowEntity>({ active: false }));
|
||||||
|
|
||||||
const result = await activeWorkflowManager.add('some-id', mode);
|
const added = await activeWorkflowManager.add('some-id', mode);
|
||||||
|
|
||||||
expect(checkSpy).not.toHaveBeenCalled();
|
expect(checkSpy).not.toHaveBeenCalled();
|
||||||
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||||
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
|
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
|
||||||
expect(result).toBe(false);
|
expect(added).toEqual({ triggersAndPollers: false, webhooks: false });
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -85,7 +85,9 @@ export class ActiveWorkflowManager {
|
|||||||
private readonly instanceSettings: InstanceSettings,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
private readonly publisher: Publisher,
|
private readonly publisher: Publisher,
|
||||||
private readonly workflowsConfig: WorkflowsConfig,
|
private readonly workflowsConfig: WorkflowsConfig,
|
||||||
) {}
|
) {
|
||||||
|
this.logger = this.logger.scoped(['workflow-activation']);
|
||||||
|
}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
strict(
|
strict(
|
||||||
@@ -158,9 +160,7 @@ export class ActiveWorkflowManager {
|
|||||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
||||||
let path = '';
|
let path = '';
|
||||||
|
|
||||||
if (webhooks.length === 0) return;
|
if (webhooks.length === 0) return false;
|
||||||
|
|
||||||
this.logger.debug(`Adding webhooks for workflow "${workflow.name}" (ID ${workflow.id})`);
|
|
||||||
|
|
||||||
for (const webhookData of webhooks) {
|
for (const webhookData of webhooks) {
|
||||||
const node = workflow.getNode(webhookData.node) as INode;
|
const node = workflow.getNode(webhookData.node) as INode;
|
||||||
@@ -227,6 +227,12 @@ export class ActiveWorkflowManager {
|
|||||||
await this.webhookService.populateCache();
|
await this.webhookService.populateCache();
|
||||||
|
|
||||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||||
|
|
||||||
|
this.logger.debug(`Added webhooks for workflow "${workflow.name}" (ID ${workflow.id})`, {
|
||||||
|
workflowId: workflow.id,
|
||||||
|
});
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -434,7 +440,7 @@ export class ActiveWorkflowManager {
|
|||||||
await Promise.all(activationPromises);
|
await Promise.all(activationPromises);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug('Activated all trigger- and poller-based workflows');
|
this.logger.debug('Finished activating all workflows');
|
||||||
}
|
}
|
||||||
|
|
||||||
private async activateWorkflow(
|
private async activateWorkflow(
|
||||||
@@ -445,13 +451,14 @@ export class ActiveWorkflowManager {
|
|||||||
if (!dbWorkflow) return;
|
if (!dbWorkflow) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, {
|
const added = await this.add(dbWorkflow.id, activationMode, dbWorkflow, {
|
||||||
shouldPublish: false,
|
shouldPublish: false,
|
||||||
});
|
});
|
||||||
if (wasActivated) {
|
|
||||||
|
if (added.webhooks || added.triggersAndPollers) {
|
||||||
this.logger.info(` - ${formatWorkflow(dbWorkflow)})`);
|
this.logger.info(` - ${formatWorkflow(dbWorkflow)})`);
|
||||||
this.logger.info(' => Started');
|
this.logger.info(' => Started');
|
||||||
this.logger.debug(`Successfully started workflow ${formatWorkflow(dbWorkflow)}`, {
|
this.logger.debug(`Activated workflow ${formatWorkflow(dbWorkflow)}`, {
|
||||||
workflowName: dbWorkflow.name,
|
workflowName: dbWorkflow.name,
|
||||||
workflowId: dbWorkflow.id,
|
workflowId: dbWorkflow.id,
|
||||||
});
|
});
|
||||||
@@ -517,6 +524,8 @@ export class ActiveWorkflowManager {
|
|||||||
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
|
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
|
||||||
* but webhooks are registered by being entered in the `webhook_entity` table,
|
* but webhooks are registered by being entered in the `webhook_entity` table,
|
||||||
* since webhooks do not require continuous execution.
|
* since webhooks do not require continuous execution.
|
||||||
|
*
|
||||||
|
* Returns whether this operation added webhooks and/or triggers and pollers.
|
||||||
*/
|
*/
|
||||||
async add(
|
async add(
|
||||||
workflowId: WorkflowId,
|
workflowId: WorkflowId,
|
||||||
@@ -524,13 +533,15 @@ export class ActiveWorkflowManager {
|
|||||||
existingWorkflow?: WorkflowEntity,
|
existingWorkflow?: WorkflowEntity,
|
||||||
{ shouldPublish } = { shouldPublish: true },
|
{ shouldPublish } = { shouldPublish: true },
|
||||||
) {
|
) {
|
||||||
|
const added = { webhooks: false, triggersAndPollers: false };
|
||||||
|
|
||||||
if (this.instanceSettings.isMultiMain && shouldPublish) {
|
if (this.instanceSettings.isMultiMain && shouldPublish) {
|
||||||
void this.publisher.publishCommand({
|
void this.publisher.publishCommand({
|
||||||
command: 'add-webhooks-triggers-and-pollers',
|
command: 'add-webhooks-triggers-and-pollers',
|
||||||
payload: { workflowId },
|
payload: { workflowId },
|
||||||
});
|
});
|
||||||
|
|
||||||
return;
|
return added;
|
||||||
}
|
}
|
||||||
|
|
||||||
let workflow: Workflow;
|
let workflow: Workflow;
|
||||||
@@ -538,10 +549,6 @@ export class ActiveWorkflowManager {
|
|||||||
const shouldAddWebhooks = this.shouldAddWebhooks(activationMode);
|
const shouldAddWebhooks = this.shouldAddWebhooks(activationMode);
|
||||||
const shouldAddTriggersAndPollers = this.shouldAddTriggersAndPollers();
|
const shouldAddTriggersAndPollers = this.shouldAddTriggersAndPollers();
|
||||||
|
|
||||||
const shouldDisplayActivationMessage =
|
|
||||||
(shouldAddWebhooks || shouldAddTriggersAndPollers) &&
|
|
||||||
['init', 'leadershipChange'].includes(activationMode);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
|
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
|
||||||
|
|
||||||
@@ -554,18 +561,10 @@ export class ActiveWorkflowManager {
|
|||||||
if (['init', 'leadershipChange'].includes(activationMode) && !dbWorkflow.active) {
|
if (['init', 'leadershipChange'].includes(activationMode) && !dbWorkflow.active) {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Skipping workflow ${formatWorkflow(dbWorkflow)} as it is no longer active`,
|
`Skipping workflow ${formatWorkflow(dbWorkflow)} as it is no longer active`,
|
||||||
{
|
{ workflowId: dbWorkflow.id },
|
||||||
workflowId: dbWorkflow.id,
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shouldDisplayActivationMessage) {
|
return added;
|
||||||
this.logger.debug(`Initializing active workflow ${formatWorkflow(dbWorkflow)} (startup)`, {
|
|
||||||
workflowName: dbWorkflow.name,
|
|
||||||
workflowId: dbWorkflow.id,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
workflow = new Workflow({
|
workflow = new Workflow({
|
||||||
@@ -591,11 +590,16 @@ export class ActiveWorkflowManager {
|
|||||||
const additionalData = await WorkflowExecuteAdditionalData.getBase();
|
const additionalData = await WorkflowExecuteAdditionalData.getBase();
|
||||||
|
|
||||||
if (shouldAddWebhooks) {
|
if (shouldAddWebhooks) {
|
||||||
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
|
added.webhooks = await this.addWebhooks(
|
||||||
|
workflow,
|
||||||
|
additionalData,
|
||||||
|
'trigger',
|
||||||
|
activationMode,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldAddTriggersAndPollers) {
|
if (shouldAddTriggersAndPollers) {
|
||||||
await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||||
activationMode,
|
activationMode,
|
||||||
executionMode: 'trigger',
|
executionMode: 'trigger',
|
||||||
additionalData,
|
additionalData,
|
||||||
@@ -620,7 +624,7 @@ export class ActiveWorkflowManager {
|
|||||||
// id of them in the static data. So make sure that data gets persisted.
|
// id of them in the static data. So make sure that data gets persisted.
|
||||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||||
|
|
||||||
return shouldDisplayActivationMessage;
|
return added;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -863,24 +867,23 @@ export class ActiveWorkflowManager {
|
|||||||
activationMode,
|
activationMode,
|
||||||
);
|
);
|
||||||
|
|
||||||
if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
|
if (workflow.getTriggerNodes().length === 0 && workflow.getPollNodes().length === 0) {
|
||||||
this.logger.debug(`Adding triggers and pollers for workflow ${formatWorkflow(dbWorkflow)}`);
|
return false;
|
||||||
|
|
||||||
await this.activeWorkflows.add(
|
|
||||||
workflow.id,
|
|
||||||
workflow,
|
|
||||||
additionalData,
|
|
||||||
executionMode,
|
|
||||||
activationMode,
|
|
||||||
getTriggerFunctions,
|
|
||||||
getPollFunctions,
|
|
||||||
);
|
|
||||||
|
|
||||||
this.logger.debug(`Workflow ${formatWorkflow(dbWorkflow)} activated`, {
|
|
||||||
workflowId: dbWorkflow.id,
|
|
||||||
workflowName: dbWorkflow.name,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.activeWorkflows.add(
|
||||||
|
workflow.id,
|
||||||
|
workflow,
|
||||||
|
additionalData,
|
||||||
|
executionMode,
|
||||||
|
activationMode,
|
||||||
|
getTriggerFunctions,
|
||||||
|
getPollFunctions,
|
||||||
|
);
|
||||||
|
|
||||||
|
this.logger.debug(`Added triggers and pollers for workflow ${formatWorkflow(dbWorkflow)}`);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeActivationError(workflowId: WorkflowId) {
|
async removeActivationError(workflowId: WorkflowId) {
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import type { WebhookEntity } from '@n8n/db';
|
import type { WebhookEntity } from '@n8n/db';
|
||||||
import { Container } from '@n8n/di';
|
import { Container } from '@n8n/di';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { InstanceSettings, Logger } from 'n8n-core';
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import { FormTrigger } from 'n8n-nodes-base/nodes/Form/FormTrigger.node';
|
import { FormTrigger } from 'n8n-nodes-base/nodes/Form/FormTrigger.node';
|
||||||
import { ScheduleTrigger } from 'n8n-nodes-base/nodes/Schedule/ScheduleTrigger.node';
|
import { ScheduleTrigger } from 'n8n-nodes-base/nodes/Schedule/ScheduleTrigger.node';
|
||||||
import { NodeApiError, Workflow } from 'n8n-workflow';
|
import { NodeApiError, Workflow } from 'n8n-workflow';
|
||||||
@@ -32,7 +32,6 @@ import * as utils from './shared/utils/';
|
|||||||
import { mockInstance } from '../shared/mocking';
|
import { mockInstance } from '../shared/mocking';
|
||||||
|
|
||||||
mockInstance(ActiveExecutions);
|
mockInstance(ActiveExecutions);
|
||||||
mockInstance(Logger);
|
|
||||||
mockInstance(Push);
|
mockInstance(Push);
|
||||||
mockInstance(SecretsHelper);
|
mockInstance(SecretsHelper);
|
||||||
mockInstance(ExecutionService);
|
mockInstance(ExecutionService);
|
||||||
|
|||||||
Reference in New Issue
Block a user