feat(core): Increase Cron observability (#17626)

This commit is contained in:
Iván Ovejero
2025-07-28 11:54:33 +02:00
committed by GitHub
parent 921cdb6fd0
commit 08c38a76f3
12 changed files with 175 additions and 40 deletions

View File

@@ -1,3 +1,4 @@
import type { Logger } from '@n8n/backend-common';
import { mock } from 'jest-mock-extended';
import type { Workflow } from 'n8n-workflow';
@@ -5,6 +6,8 @@ import type { InstanceSettings } from '@/instance-settings';
import { ScheduledTaskManager } from '../scheduled-task-manager';
const logger = mock<Logger>({ scoped: jest.fn().mockReturnValue(mock<Logger>()) });
describe('ScheduledTaskManager', () => {
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const workflow = mock<Workflow>({ timezone: 'GMT' });
@@ -16,14 +19,14 @@ describe('ScheduledTaskManager', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
scheduledTaskManager = new ScheduledTaskManager(instanceSettings);
scheduledTaskManager = new ScheduledTaskManager(instanceSettings, logger, mock());
});
it('should throw when workflow timezone is invalid', () => {
expect(() =>
scheduledTaskManager.registerCron(
mock<Workflow>({ timezone: 'somewhere' }),
everyMinute,
{ expression: everyMinute },
onTick,
),
).toThrow('Invalid timezone.');
@@ -36,17 +39,21 @@ describe('ScheduledTaskManager', () => {
).toThrow();
});
it('should register valid CronJobs', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
it('should register valid CronJobs', () => {
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).toHaveBeenCalledTimes(10);
});
it('should should not invoke on follower instances', async () => {
scheduledTaskManager = new ScheduledTaskManager(mock<InstanceSettings>({ isLeader: false }));
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
it('should not invoke on follower instances', () => {
scheduledTaskManager = new ScheduledTaskManager(
mock<InstanceSettings>({ isLeader: false }),
logger,
mock(),
);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
@@ -54,18 +61,26 @@ describe('ScheduledTaskManager', () => {
});
it('should deregister CronJobs for a workflow', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
expect(scheduledTaskManager.cronJobs.get(workflow.id)?.length).toBe(3);
expect(scheduledTaskManager.cronMap.get(workflow.id)).toHaveLength(3);
scheduledTaskManager.deregisterCrons(workflow.id);
expect(scheduledTaskManager.cronJobs.get(workflow.id)?.length).toBe(0);
expect(scheduledTaskManager.cronMap.get(workflow.id)).toBeUndefined();
expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).not.toHaveBeenCalled();
});
it('should not set up log interval when activeInterval is 0', () => {
const configWithZeroInterval = mock({ activeInterval: 0 });
const manager = new ScheduledTaskManager(instanceSettings, logger, configWithZeroInterval);
// @ts-expect-error Private property
expect(manager.logInterval).toBeUndefined();
});
});

View File

@@ -149,7 +149,7 @@ export class ActiveWorkflows {
};
// Get all the trigger times
const cronTimes = (pollTimes.item || []).map(toCronExpression);
const cronExpressions = (pollTimes.item || []).map(toCronExpression);
// The trigger function to execute when the cron-time got reached
const executeTrigger = async (testingTrigger = false) => {
this.logger.debug(`Polling trigger initiated for workflow "${workflow.name}"`, {
@@ -177,15 +177,15 @@ export class ActiveWorkflows {
// Execute the trigger directly to be able to know if it works
await executeTrigger(true);
for (const cronTime of cronTimes) {
const cronTimeParts = cronTime.split(' ');
for (const expression of cronExpressions) {
const cronTimeParts = expression.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
throw new ApplicationError(
'The polling interval is too short. It has to be at least a minute.',
);
}
this.scheduledTaskManager.registerCron(workflow, cronTime, executeTrigger);
this.scheduledTaskManager.registerCron(workflow, { expression }, executeTrigger);
}
}

View File

@@ -19,11 +19,11 @@ describe('getSchedulingFunctions', () => {
describe('registerCron', () => {
it('should invoke scheduledTaskManager.registerCron', () => {
schedulingFunctions.registerCron(cronExpression, onTick);
schedulingFunctions.registerCron({ expression: cronExpression }, onTick);
expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(
workflow,
cronExpression,
{ expression: cronExpression },
onTick,
);
});

View File

@@ -6,7 +6,6 @@ import { ScheduledTaskManager } from '../../scheduled-task-manager';
export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cronExpression, onTick) =>
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
registerCron: (cron, onTick) => scheduledTaskManager.registerCron(workflow, cron, onTick),
};
};

View File

@@ -1,44 +1,115 @@
import { Logger } from '@n8n/backend-common';
import { CronLoggingConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import { Service } from '@n8n/di';
import { CronJob } from 'cron';
import type { CronExpression, Workflow } from 'n8n-workflow';
import type { Cron, Workflow } from 'n8n-workflow';
import { InstanceSettings } from '@/instance-settings';
@Service()
export class ScheduledTaskManager {
constructor(private readonly instanceSettings: InstanceSettings) {}
readonly cronMap = new Map<string, Array<{ job: CronJob; displayableCron: string }>>();
readonly cronJobs = new Map<string, CronJob[]>();
private readonly logInterval?: NodeJS.Timeout;
constructor(
private readonly instanceSettings: InstanceSettings,
private readonly logger: Logger,
private readonly config: CronLoggingConfig,
) {
this.logger = this.logger.scoped('cron');
if (this.config.activeInterval === 0) return;
this.logInterval = setInterval(
() => this.logActiveCrons(),
this.config.activeInterval * Time.minutes.toMilliseconds,
);
}
private logActiveCrons() {
const activeCrons: Record<string, string[]> = {};
for (const [workflowId, cronJobs] of this.cronMap) {
activeCrons[`workflow-${workflowId}`] = cronJobs.map(
({ displayableCron }) => displayableCron,
);
}
if (Object.keys(activeCrons).length === 0) return;
this.logger.debug('Currently active crons', { activeCrons });
}
registerCron(workflow: Workflow, { expression, recurrence }: Cron, onTick: () => void) {
const recurrenceStr = recurrence?.activated
? `every ${recurrence.intervalSize} ${recurrence.typeInterval}`
: undefined;
const displayableCron = recurrenceStr ? `${expression} (${recurrenceStr})` : expression;
registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
const cronJob = new CronJob(
cronExpression,
expression,
() => {
if (this.instanceSettings.isLeader) onTick();
if (this.instanceSettings.isLeader) {
this.logger.debug('Executing cron for workflow', {
workflowId: workflow.id,
cron: displayableCron,
instanceRole: this.instanceSettings.instanceRole,
});
onTick();
}
},
undefined,
true,
workflow.timezone,
);
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
if (cronJobsForWorkflow) {
cronJobsForWorkflow.push(cronJob);
const workflowCronEntries = this.cronMap.get(workflow.id);
const cronEntry = { job: cronJob, displayableCron };
if (workflowCronEntries) {
workflowCronEntries.push(cronEntry);
} else {
this.cronJobs.set(workflow.id, [cronJob]);
this.cronMap.set(workflow.id, [cronEntry]);
}
this.logger.debug('Registered cron for workflow', {
workflowId: workflow.id,
cron: displayableCron,
instanceRole: this.instanceSettings.instanceRole,
});
}
deregisterCrons(workflowId: string) {
const cronJobs = this.cronJobs.get(workflowId) ?? [];
const cronJobs = this.cronMap.get(workflowId) ?? [];
if (cronJobs.length === 0) return;
const crons: string[] = [];
while (cronJobs.length) {
const cronJob = cronJobs.pop();
if (cronJob) cronJob.stop();
const cronEntry = cronJobs.pop();
if (cronEntry) {
crons.push(cronEntry.displayableCron);
cronEntry.job.stop();
}
}
this.cronMap.delete(workflowId);
this.logger.info('Deregistered all crons for workflow', {
workflowId,
crons,
instanceRole: this.instanceSettings.instanceRole,
});
}
deregisterAllCrons() {
for (const workflowId of Object.keys(this.cronJobs)) {
for (const workflowId of this.cronMap.keys()) {
this.deregisterCrons(workflowId);
}
if (this.logInterval) clearInterval(this.logInterval);
}
}