fix(core): Protect against duplicate cron registration (#18005)

This commit is contained in:
Iván Ovejero
2025-08-06 15:35:52 +02:00
committed by GitHub
parent 32f47948d6
commit 948ebe6702
11 changed files with 264 additions and 98 deletions

View File

@@ -196,7 +196,7 @@ describe('ActiveWorkflows', () => {
// Get the executeTrigger function that was registered
const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0];
const executeTrigger = registerCronCall[2] as () => Promise<void>;
const executeTrigger = registerCronCall[1] as () => Promise<void>;
// Execute the trigger function to simulate a regular poll
await executeTrigger();

View File

@@ -1,6 +1,6 @@
import type { Logger } from '@n8n/backend-common';
import { mock } from 'jest-mock-extended';
import type { Workflow } from 'n8n-workflow';
import type { CronContext, Workflow } from 'n8n-workflow';
import type { InstanceSettings } from '@/instance-settings';
@@ -12,6 +12,7 @@ describe('ScheduledTaskManager', () => {
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const workflow = mock<Workflow>({ timezone: 'GMT' });
const everyMinute = '0 * * * * *';
const onTick = jest.fn();
let scheduledTaskManager: ScheduledTaskManager;
@@ -19,14 +20,33 @@ describe('ScheduledTaskManager', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
scheduledTaskManager = new ScheduledTaskManager(instanceSettings, logger, mock());
scheduledTaskManager = new ScheduledTaskManager(instanceSettings, logger, mock(), mock());
});
it('should not register duplicate crons', () => {
const ctx: CronContext = {
workflowId: workflow.id,
nodeId: 'test-node-id',
timezone: workflow.timezone,
expression: everyMinute,
};
scheduledTaskManager.registerCron(ctx, onTick);
expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)?.size).toBe(1);
scheduledTaskManager.registerCron(ctx, onTick);
expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)?.size).toBe(1);
});
it('should throw when workflow timezone is invalid', () => {
expect(() =>
scheduledTaskManager.registerCron(
mock<Workflow>({ timezone: 'somewhere' }),
{ expression: everyMinute },
{
workflowId: workflow.id,
nodeId: 'test-node-id',
timezone: 'somewhere',
expression: everyMinute,
},
onTick,
),
).toThrow('Invalid timezone.');
@@ -40,7 +60,15 @@ describe('ScheduledTaskManager', () => {
});
it('should register valid CronJobs', () => {
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
scheduledTaskManager.registerCron(
{
workflowId: workflow.id,
nodeId: 'test-node-id',
timezone: workflow.timezone,
expression: everyMinute,
},
onTick,
);
expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
@@ -52,24 +80,52 @@ describe('ScheduledTaskManager', () => {
mock<InstanceSettings>({ isLeader: false }),
logger,
mock(),
mock(),
);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
const ctx: CronContext = {
workflowId: workflow.id,
nodeId: 'test-node-id',
timezone: workflow.timezone,
expression: everyMinute,
};
scheduledTaskManager.registerCron(ctx, onTick);
expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).not.toHaveBeenCalled();
});
it('should deregister CronJobs for a workflow', async () => {
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick);
it('should deregister CronJobs for a workflow', () => {
const ctx1: CronContext = {
workflowId: workflow.id,
nodeId: 'test-node-id-1',
timezone: workflow.timezone,
expression: everyMinute,
};
const ctx2: CronContext = {
workflowId: workflow.id,
nodeId: 'test-node-id-2',
timezone: workflow.timezone,
expression: everyMinute,
};
const ctx3: CronContext = {
workflowId: workflow.id,
nodeId: 'test-node-id-3',
timezone: workflow.timezone,
expression: everyMinute,
};
expect(scheduledTaskManager.cronMap.get(workflow.id)).toHaveLength(3);
scheduledTaskManager.registerCron(ctx1, onTick);
scheduledTaskManager.registerCron(ctx2, onTick);
scheduledTaskManager.registerCron(ctx3, onTick);
expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)?.size).toBe(3);
scheduledTaskManager.deregisterCrons(workflow.id);
expect(scheduledTaskManager.cronMap.get(workflow.id)).toBeUndefined();
expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)).toBeUndefined();
expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
@@ -78,7 +134,12 @@ describe('ScheduledTaskManager', () => {
it('should not set up log interval when activeInterval is 0', () => {
const configWithZeroInterval = mock({ activeInterval: 0 });
const manager = new ScheduledTaskManager(instanceSettings, logger, configWithZeroInterval);
const manager = new ScheduledTaskManager(
instanceSettings,
logger,
configWithZeroInterval,
mock(),
);
// @ts-expect-error Private property
expect(manager.logInterval).toBeUndefined();

View File

@@ -1,6 +1,7 @@
import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import type {
CronContext,
INode,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
@@ -10,9 +11,9 @@ import type {
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
ApplicationError,
toCronExpression,
TriggerCloseError,
UserError,
WorkflowActivationError,
WorkflowDeactivationError,
} from 'n8n-workflow';
@@ -178,14 +179,18 @@ export class ActiveWorkflows {
await executeTrigger(true);
for (const expression of cronExpressions) {
const cronTimeParts = expression.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
throw new ApplicationError(
'The polling interval is too short. It has to be at least a minute.',
);
if (expression.split(' ').at(0)?.includes('*')) {
throw new UserError('The polling interval is too short. It has to be at least a minute.');
}
this.scheduledTaskManager.registerCron(workflow, { expression }, executeTrigger);
const ctx: CronContext = {
workflowId: workflow.id,
timezone: workflow.timezone,
nodeId: node.id,
expression,
};
this.scheduledTaskManager.registerCron(ctx, executeTrigger);
}
}

View File

@@ -42,7 +42,7 @@ export class PollContext extends NodeExecutionContext implements IPollFunctions
returnJsonArray,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
...getSchedulingFunctions(workflow.id, workflow.timezone, node.id),
};
}

View File

@@ -44,7 +44,7 @@ export class TriggerContext extends NodeExecutionContext implements ITriggerFunc
...getSSHTunnelFunctions(),
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
...getSchedulingFunctions(workflow.id, workflow.timezone, node.id),
};
}

View File

@@ -1,5 +1,5 @@
import { mock } from 'jest-mock-extended';
import type { Workflow } from 'n8n-workflow';
import type { CronContext, Workflow } from 'n8n-workflow';
import { mockInstance } from '@test/utils';
@@ -7,11 +7,15 @@ import { ScheduledTaskManager } from '../../../scheduled-task-manager';
import { getSchedulingFunctions } from '../scheduling-helper-functions';
describe('getSchedulingFunctions', () => {
const workflow = mock<Workflow>({ id: 'test-workflow' });
const workflow = mock<Workflow>({ id: 'test-workflow', timezone: 'Europe/Berlin' });
const cronExpression = '* * * * * 0';
const onTick = jest.fn();
const scheduledTaskManager = mockInstance(ScheduledTaskManager);
const schedulingFunctions = getSchedulingFunctions(workflow);
const schedulingFunctions = getSchedulingFunctions(
workflow.id,
workflow.timezone,
'test-node-id',
);
it('should return scheduling functions', () => {
expect(typeof schedulingFunctions.registerCron).toBe('function');
@@ -19,13 +23,16 @@ describe('getSchedulingFunctions', () => {
describe('registerCron', () => {
it('should invoke scheduledTaskManager.registerCron', () => {
const ctx: CronContext = {
nodeId: 'test-node-id',
expression: cronExpression,
workflowId: 'test-workflow',
timezone: 'Europe/Berlin',
};
schedulingFunctions.registerCron({ expression: cronExpression }, onTick);
expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(
workflow,
{ expression: cronExpression },
onTick,
);
expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(ctx, onTick);
});
});
});

View File

@@ -1,11 +1,25 @@
import { Container } from '@n8n/di';
import type { SchedulingFunctions, Workflow } from 'n8n-workflow';
import type { SchedulingFunctions, Workflow, CronContext, Cron } from 'n8n-workflow';
import { ScheduledTaskManager } from '../../scheduled-task-manager';
export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
export const getSchedulingFunctions = (
workflowId: Workflow['id'],
timezone: Workflow['timezone'],
nodeId: string,
): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cron, onTick) => scheduledTaskManager.registerCron(workflow, cron, onTick),
registerCron: ({ expression, recurrence }: Cron, onTick) => {
const ctx: CronContext = {
expression,
recurrence,
nodeId,
workflowId,
timezone,
};
return scheduledTaskManager.registerCron(ctx, onTick);
},
};
};

View File

@@ -3,113 +3,160 @@ import { CronLoggingConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import { Service } from '@n8n/di';
import { CronJob } from 'cron';
import type { Cron, Workflow } from 'n8n-workflow';
import type { CronContext, Workflow } from 'n8n-workflow';
import { ErrorReporter } from '@/errors';
import { InstanceSettings } from '@/instance-settings';
type CronKey = string; // see `ScheduledTaskManager.toCronKey`
type Cron = { job: CronJob; summary: string; ctx: CronContext };
type CronsByWorkflow = Map<Workflow['id'], Map<CronKey, Cron>>;
@Service()
export class ScheduledTaskManager {
readonly cronMap = new Map<string, Array<{ job: CronJob; displayableCron: string }>>();
readonly cronsByWorkflow: CronsByWorkflow = new Map();
private readonly logInterval?: NodeJS.Timeout;
private logInterval?: NodeJS.Timeout;
/** Crons currently active instance-wide, to display in logs. */
private get loggableCrons() {
const loggableCrons: Record<string, string[]> = {};
for (const [workflowId, crons] of this.cronsByWorkflow) {
loggableCrons[`workflowId-${workflowId}`] = Array.from(crons.values()).map(
({ summary }) => summary,
);
}
return loggableCrons;
}
constructor(
private readonly instanceSettings: InstanceSettings,
private readonly logger: Logger,
private readonly config: CronLoggingConfig,
{ activeInterval }: CronLoggingConfig,
private readonly errorReporter: ErrorReporter,
) {
this.logger = this.logger.scoped('cron');
if (this.config.activeInterval === 0) return;
if (activeInterval === 0) return;
this.logInterval = setInterval(
() => this.logActiveCrons(),
this.config.activeInterval * Time.minutes.toMilliseconds,
);
this.logInterval = setInterval(() => {
if (Object.keys(this.loggableCrons).length === 0) return;
this.logger.debug('Currently active crons', { active: this.loggableCrons });
}, activeInterval * Time.minutes.toMilliseconds);
}
private logActiveCrons() {
const activeCrons: Record<string, string[]> = {};
for (const [workflowId, cronJobs] of this.cronMap) {
activeCrons[`workflow-${workflowId}`] = cronJobs.map(
({ displayableCron }) => displayableCron,
);
registerCron(ctx: CronContext, onTick: () => void) {
const { workflowId, timezone, nodeId, expression, recurrence } = ctx;
const summary = recurrence?.activated
? `${expression} (every ${recurrence.intervalSize} ${recurrence.typeInterval})`
: expression;
const workflowCrons = this.cronsByWorkflow.get(workflowId);
const key = this.toCronKey({ workflowId, nodeId, expression, timezone, recurrence });
if (workflowCrons?.has(key)) {
this.errorReporter.error('Skipped registration for already registered cron', {
tags: { cron: 'duplicate' },
extra: {
workflowId,
timezone,
nodeId,
expression,
recurrence,
instanceRole: this.instanceSettings.instanceRole,
},
});
return;
}
if (Object.keys(activeCrons).length === 0) return;
this.logger.debug('Currently active crons', { activeCrons });
}
registerCron(workflow: Workflow, { expression, recurrence }: Cron, onTick: () => void) {
const recurrenceStr = recurrence?.activated
? `every ${recurrence.intervalSize} ${recurrence.typeInterval}`
: undefined;
const displayableCron = recurrenceStr ? `${expression} (${recurrenceStr})` : expression;
const cronJob = new CronJob(
const job = new CronJob(
expression,
() => {
if (this.instanceSettings.isLeader) {
this.logger.debug('Executing cron for workflow', {
workflowId: workflow.id,
cron: displayableCron,
instanceRole: this.instanceSettings.instanceRole,
});
onTick();
}
if (!this.instanceSettings.isLeader) return;
this.logger.debug('Executing cron for workflow', {
workflowId,
nodeId,
cron: summary,
instanceRole: this.instanceSettings.instanceRole,
});
onTick();
},
undefined,
true,
workflow.timezone,
timezone,
);
const workflowCronEntries = this.cronMap.get(workflow.id);
const cronEntry = { job: cronJob, displayableCron };
const cron: Cron = { job, summary, ctx };
if (workflowCronEntries) {
workflowCronEntries.push(cronEntry);
if (!workflowCrons) {
this.cronsByWorkflow.set(workflowId, new Map([[key, cron]]));
} else {
this.cronMap.set(workflow.id, [cronEntry]);
workflowCrons.set(key, cron);
}
this.logger.debug('Registered cron for workflow', {
workflowId: workflow.id,
cron: displayableCron,
workflowId,
cron: summary,
instanceRole: this.instanceSettings.instanceRole,
});
}
deregisterCrons(workflowId: string) {
const cronJobs = this.cronMap.get(workflowId) ?? [];
const workflowCrons = this.cronsByWorkflow.get(workflowId);
if (cronJobs.length === 0) return;
if (!workflowCrons || workflowCrons.size === 0) return;
const crons: string[] = [];
const summaries: string[] = [];
while (cronJobs.length) {
const cronEntry = cronJobs.pop();
if (cronEntry) {
crons.push(cronEntry.displayableCron);
cronEntry.job.stop();
}
for (const cron of workflowCrons.values()) {
summaries.push(cron.summary);
cron.job.stop();
}
this.cronMap.delete(workflowId);
this.cronsByWorkflow.delete(workflowId);
this.logger.info('Deregistered all crons for workflow', {
workflowId,
crons,
crons: summaries,
instanceRole: this.instanceSettings.instanceRole,
});
}
deregisterAllCrons() {
for (const workflowId of this.cronMap.keys()) {
for (const workflowId of this.cronsByWorkflow.keys()) {
this.deregisterCrons(workflowId);
}
if (this.logInterval) clearInterval(this.logInterval);
clearInterval(this.logInterval);
this.logInterval = undefined;
}
private toCronKey(ctx: CronContext): CronKey {
const { recurrence, ...rest } = ctx;
const flattened: Record<string, unknown> = !recurrence
? rest
: {
...rest,
recurrenceActivated: recurrence.activated,
...(recurrence.activated && {
recurrenceIndex: recurrence.index,
recurrenceIntervalSize: recurrence.intervalSize,
recurrenceTypeInterval: recurrence.typeInterval,
}),
};
const sorted = Object.keys(flattened)
.sort()
.reduce<Record<string, unknown>>((result, key) => {
result[key] = flattened[key];
return result;
}, {});
return JSON.stringify(sorted);
}
}