diff --git a/packages/core/src/execution-engine/__tests__/active-workflows.test.ts b/packages/core/src/execution-engine/__tests__/active-workflows.test.ts index 022ef022cf..51e4b7b192 100644 --- a/packages/core/src/execution-engine/__tests__/active-workflows.test.ts +++ b/packages/core/src/execution-engine/__tests__/active-workflows.test.ts @@ -196,7 +196,7 @@ describe('ActiveWorkflows', () => { // Get the executeTrigger function that was registered const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0]; - const executeTrigger = registerCronCall[2] as () => Promise; + const executeTrigger = registerCronCall[1] as () => Promise; // Execute the trigger function to simulate a regular poll await executeTrigger(); diff --git a/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts b/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts index f3836a9376..782cddaa24 100644 --- a/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts +++ b/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts @@ -1,6 +1,6 @@ import type { Logger } from '@n8n/backend-common'; import { mock } from 'jest-mock-extended'; -import type { Workflow } from 'n8n-workflow'; +import type { CronContext, Workflow } from 'n8n-workflow'; import type { InstanceSettings } from '@/instance-settings'; @@ -12,6 +12,7 @@ describe('ScheduledTaskManager', () => { const instanceSettings = mock({ isLeader: true }); const workflow = mock({ timezone: 'GMT' }); const everyMinute = '0 * * * * *'; + const onTick = jest.fn(); let scheduledTaskManager: ScheduledTaskManager; @@ -19,14 +20,33 @@ describe('ScheduledTaskManager', () => { beforeEach(() => { jest.clearAllMocks(); jest.useFakeTimers(); - scheduledTaskManager = new ScheduledTaskManager(instanceSettings, logger, mock()); + scheduledTaskManager = new ScheduledTaskManager(instanceSettings, logger, mock(), mock()); + }); + + it('should not register duplicate crons', () => { + const ctx: CronContext = { + workflowId: workflow.id, + nodeId: 'test-node-id', + timezone: workflow.timezone, + expression: everyMinute, + }; + + scheduledTaskManager.registerCron(ctx, onTick); + expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)?.size).toBe(1); + + scheduledTaskManager.registerCron(ctx, onTick); + expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)?.size).toBe(1); }); it('should throw when workflow timezone is invalid', () => { expect(() => scheduledTaskManager.registerCron( - mock({ timezone: 'somewhere' }), - { expression: everyMinute }, + { + workflowId: workflow.id, + nodeId: 'test-node-id', + timezone: 'somewhere', + expression: everyMinute, + }, onTick, ), ).toThrow('Invalid timezone.'); @@ -40,7 +60,15 @@ describe('ScheduledTaskManager', () => { }); it('should register valid CronJobs', () => { - scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); + scheduledTaskManager.registerCron( + { + workflowId: workflow.id, + nodeId: 'test-node-id', + timezone: workflow.timezone, + expression: everyMinute, + }, + onTick, + ); expect(onTick).not.toHaveBeenCalled(); jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes @@ -52,24 +80,52 @@ describe('ScheduledTaskManager', () => { mock({ isLeader: false }), logger, mock(), + mock(), ); - scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); + + const ctx: CronContext = { + workflowId: workflow.id, + nodeId: 'test-node-id', + timezone: workflow.timezone, + expression: everyMinute, + }; + + scheduledTaskManager.registerCron(ctx, onTick); expect(onTick).not.toHaveBeenCalled(); jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes expect(onTick).not.toHaveBeenCalled(); }); - it('should deregister CronJobs for a workflow', async () => { - scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); - scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); - scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); + it('should deregister CronJobs for a workflow', () => { + const ctx1: CronContext = { + workflowId: workflow.id, + nodeId: 'test-node-id-1', + timezone: workflow.timezone, + expression: everyMinute, + }; + const ctx2: CronContext = { + workflowId: workflow.id, + nodeId: 'test-node-id-2', + timezone: workflow.timezone, + expression: everyMinute, + }; + const ctx3: CronContext = { + workflowId: workflow.id, + nodeId: 'test-node-id-3', + timezone: workflow.timezone, + expression: everyMinute, + }; - expect(scheduledTaskManager.cronMap.get(workflow.id)).toHaveLength(3); + scheduledTaskManager.registerCron(ctx1, onTick); + scheduledTaskManager.registerCron(ctx2, onTick); + scheduledTaskManager.registerCron(ctx3, onTick); + + expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)?.size).toBe(3); scheduledTaskManager.deregisterCrons(workflow.id); - expect(scheduledTaskManager.cronMap.get(workflow.id)).toBeUndefined(); + expect(scheduledTaskManager.cronsByWorkflow.get(workflow.id)).toBeUndefined(); expect(onTick).not.toHaveBeenCalled(); jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes @@ -78,7 +134,12 @@ describe('ScheduledTaskManager', () => { it('should not set up log interval when activeInterval is 0', () => { const configWithZeroInterval = mock({ activeInterval: 0 }); - const manager = new ScheduledTaskManager(instanceSettings, logger, configWithZeroInterval); + const manager = new ScheduledTaskManager( + instanceSettings, + logger, + configWithZeroInterval, + mock(), + ); // @ts-expect-error Private property expect(manager.logInterval).toBeUndefined(); diff --git a/packages/core/src/execution-engine/active-workflows.ts b/packages/core/src/execution-engine/active-workflows.ts index 7fd1e6a0e9..c908ab0911 100644 --- a/packages/core/src/execution-engine/active-workflows.ts +++ b/packages/core/src/execution-engine/active-workflows.ts @@ -1,6 +1,7 @@ import { Logger } from '@n8n/backend-common'; import { Service } from '@n8n/di'; import type { + CronContext, INode, ITriggerResponse, IWorkflowExecuteAdditionalData, @@ -10,9 +11,9 @@ import type { WorkflowExecuteMode, } from 'n8n-workflow'; import { - ApplicationError, toCronExpression, TriggerCloseError, + UserError, WorkflowActivationError, WorkflowDeactivationError, } from 'n8n-workflow'; @@ -178,14 +179,18 @@ export class ActiveWorkflows { await executeTrigger(true); for (const expression of cronExpressions) { - const cronTimeParts = expression.split(' '); - if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) { - throw new ApplicationError( - 'The polling interval is too short. It has to be at least a minute.', - ); + if (expression.split(' ').at(0)?.includes('*')) { + throw new UserError('The polling interval is too short. It has to be at least a minute.'); } - this.scheduledTaskManager.registerCron(workflow, { expression }, executeTrigger); + const ctx: CronContext = { + workflowId: workflow.id, + timezone: workflow.timezone, + nodeId: node.id, + expression, + }; + + this.scheduledTaskManager.registerCron(ctx, executeTrigger); } } diff --git a/packages/core/src/execution-engine/node-execution-context/poll-context.ts b/packages/core/src/execution-engine/node-execution-context/poll-context.ts index b96991a5d5..80221d3774 100644 --- a/packages/core/src/execution-engine/node-execution-context/poll-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/poll-context.ts @@ -42,7 +42,7 @@ export class PollContext extends NodeExecutionContext implements IPollFunctions returnJsonArray, ...getRequestHelperFunctions(workflow, node, additionalData), ...getBinaryHelperFunctions(additionalData, workflow.id), - ...getSchedulingFunctions(workflow), + ...getSchedulingFunctions(workflow.id, workflow.timezone, node.id), }; } diff --git a/packages/core/src/execution-engine/node-execution-context/trigger-context.ts b/packages/core/src/execution-engine/node-execution-context/trigger-context.ts index 2ecf25479b..2ed7819644 100644 --- a/packages/core/src/execution-engine/node-execution-context/trigger-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/trigger-context.ts @@ -44,7 +44,7 @@ export class TriggerContext extends NodeExecutionContext implements ITriggerFunc ...getSSHTunnelFunctions(), ...getRequestHelperFunctions(workflow, node, additionalData), ...getBinaryHelperFunctions(additionalData, workflow.id), - ...getSchedulingFunctions(workflow), + ...getSchedulingFunctions(workflow.id, workflow.timezone, node.id), }; } diff --git a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts index 5c65225b25..04b87fdd05 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts @@ -1,5 +1,5 @@ import { mock } from 'jest-mock-extended'; -import type { Workflow } from 'n8n-workflow'; +import type { CronContext, Workflow } from 'n8n-workflow'; import { mockInstance } from '@test/utils'; @@ -7,11 +7,15 @@ import { ScheduledTaskManager } from '../../../scheduled-task-manager'; import { getSchedulingFunctions } from '../scheduling-helper-functions'; describe('getSchedulingFunctions', () => { - const workflow = mock({ id: 'test-workflow' }); + const workflow = mock({ id: 'test-workflow', timezone: 'Europe/Berlin' }); const cronExpression = '* * * * * 0'; const onTick = jest.fn(); const scheduledTaskManager = mockInstance(ScheduledTaskManager); - const schedulingFunctions = getSchedulingFunctions(workflow); + const schedulingFunctions = getSchedulingFunctions( + workflow.id, + workflow.timezone, + 'test-node-id', + ); it('should return scheduling functions', () => { expect(typeof schedulingFunctions.registerCron).toBe('function'); @@ -19,13 +23,16 @@ describe('getSchedulingFunctions', () => { describe('registerCron', () => { it('should invoke scheduledTaskManager.registerCron', () => { + const ctx: CronContext = { + nodeId: 'test-node-id', + expression: cronExpression, + workflowId: 'test-workflow', + timezone: 'Europe/Berlin', + }; + schedulingFunctions.registerCron({ expression: cronExpression }, onTick); - expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith( - workflow, - { expression: cronExpression }, - onTick, - ); + expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(ctx, onTick); }); }); }); diff --git a/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts b/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts index 64363f6eb0..a3483cb63a 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts @@ -1,11 +1,25 @@ import { Container } from '@n8n/di'; -import type { SchedulingFunctions, Workflow } from 'n8n-workflow'; +import type { SchedulingFunctions, Workflow, CronContext, Cron } from 'n8n-workflow'; import { ScheduledTaskManager } from '../../scheduled-task-manager'; -export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => { +export const getSchedulingFunctions = ( + workflowId: Workflow['id'], + timezone: Workflow['timezone'], + nodeId: string, +): SchedulingFunctions => { const scheduledTaskManager = Container.get(ScheduledTaskManager); return { - registerCron: (cron, onTick) => scheduledTaskManager.registerCron(workflow, cron, onTick), + registerCron: ({ expression, recurrence }: Cron, onTick) => { + const ctx: CronContext = { + expression, + recurrence, + nodeId, + workflowId, + timezone, + }; + + return scheduledTaskManager.registerCron(ctx, onTick); + }, }; }; diff --git a/packages/core/src/execution-engine/scheduled-task-manager.ts b/packages/core/src/execution-engine/scheduled-task-manager.ts index 1dade519b8..36347a4277 100644 --- a/packages/core/src/execution-engine/scheduled-task-manager.ts +++ b/packages/core/src/execution-engine/scheduled-task-manager.ts @@ -3,113 +3,160 @@ import { CronLoggingConfig } from '@n8n/config'; import { Time } from '@n8n/constants'; import { Service } from '@n8n/di'; import { CronJob } from 'cron'; -import type { Cron, Workflow } from 'n8n-workflow'; +import type { CronContext, Workflow } from 'n8n-workflow'; +import { ErrorReporter } from '@/errors'; import { InstanceSettings } from '@/instance-settings'; +type CronKey = string; // see `ScheduledTaskManager.toCronKey` +type Cron = { job: CronJob; summary: string; ctx: CronContext }; +type CronsByWorkflow = Map>; + @Service() export class ScheduledTaskManager { - readonly cronMap = new Map>(); + readonly cronsByWorkflow: CronsByWorkflow = new Map(); - private readonly logInterval?: NodeJS.Timeout; + private logInterval?: NodeJS.Timeout; + + /** Crons currently active instance-wide, to display in logs. */ + private get loggableCrons() { + const loggableCrons: Record = {}; + + for (const [workflowId, crons] of this.cronsByWorkflow) { + loggableCrons[`workflowId-${workflowId}`] = Array.from(crons.values()).map( + ({ summary }) => summary, + ); + } + + return loggableCrons; + } constructor( private readonly instanceSettings: InstanceSettings, private readonly logger: Logger, - private readonly config: CronLoggingConfig, + { activeInterval }: CronLoggingConfig, + private readonly errorReporter: ErrorReporter, ) { this.logger = this.logger.scoped('cron'); - if (this.config.activeInterval === 0) return; + if (activeInterval === 0) return; - this.logInterval = setInterval( - () => this.logActiveCrons(), - this.config.activeInterval * Time.minutes.toMilliseconds, - ); + this.logInterval = setInterval(() => { + if (Object.keys(this.loggableCrons).length === 0) return; + this.logger.debug('Currently active crons', { active: this.loggableCrons }); + }, activeInterval * Time.minutes.toMilliseconds); } - private logActiveCrons() { - const activeCrons: Record = {}; - for (const [workflowId, cronJobs] of this.cronMap) { - activeCrons[`workflow-${workflowId}`] = cronJobs.map( - ({ displayableCron }) => displayableCron, - ); + registerCron(ctx: CronContext, onTick: () => void) { + const { workflowId, timezone, nodeId, expression, recurrence } = ctx; + + const summary = recurrence?.activated + ? `${expression} (every ${recurrence.intervalSize} ${recurrence.typeInterval})` + : expression; + + const workflowCrons = this.cronsByWorkflow.get(workflowId); + const key = this.toCronKey({ workflowId, nodeId, expression, timezone, recurrence }); + + if (workflowCrons?.has(key)) { + this.errorReporter.error('Skipped registration for already registered cron', { + tags: { cron: 'duplicate' }, + extra: { + workflowId, + timezone, + nodeId, + expression, + recurrence, + instanceRole: this.instanceSettings.instanceRole, + }, + }); + return; } - if (Object.keys(activeCrons).length === 0) return; - - this.logger.debug('Currently active crons', { activeCrons }); - } - - registerCron(workflow: Workflow, { expression, recurrence }: Cron, onTick: () => void) { - const recurrenceStr = recurrence?.activated - ? `every ${recurrence.intervalSize} ${recurrence.typeInterval}` - : undefined; - - const displayableCron = recurrenceStr ? `${expression} (${recurrenceStr})` : expression; - - const cronJob = new CronJob( + const job = new CronJob( expression, () => { - if (this.instanceSettings.isLeader) { - this.logger.debug('Executing cron for workflow', { - workflowId: workflow.id, - cron: displayableCron, - instanceRole: this.instanceSettings.instanceRole, - }); - onTick(); - } + if (!this.instanceSettings.isLeader) return; + + this.logger.debug('Executing cron for workflow', { + workflowId, + nodeId, + cron: summary, + instanceRole: this.instanceSettings.instanceRole, + }); + + onTick(); }, undefined, true, - workflow.timezone, + timezone, ); - const workflowCronEntries = this.cronMap.get(workflow.id); - const cronEntry = { job: cronJob, displayableCron }; + const cron: Cron = { job, summary, ctx }; - if (workflowCronEntries) { - workflowCronEntries.push(cronEntry); + if (!workflowCrons) { + this.cronsByWorkflow.set(workflowId, new Map([[key, cron]])); } else { - this.cronMap.set(workflow.id, [cronEntry]); + workflowCrons.set(key, cron); } this.logger.debug('Registered cron for workflow', { - workflowId: workflow.id, - cron: displayableCron, + workflowId, + cron: summary, instanceRole: this.instanceSettings.instanceRole, }); } deregisterCrons(workflowId: string) { - const cronJobs = this.cronMap.get(workflowId) ?? []; + const workflowCrons = this.cronsByWorkflow.get(workflowId); - if (cronJobs.length === 0) return; + if (!workflowCrons || workflowCrons.size === 0) return; - const crons: string[] = []; + const summaries: string[] = []; - while (cronJobs.length) { - const cronEntry = cronJobs.pop(); - if (cronEntry) { - crons.push(cronEntry.displayableCron); - cronEntry.job.stop(); - } + for (const cron of workflowCrons.values()) { + summaries.push(cron.summary); + cron.job.stop(); } - this.cronMap.delete(workflowId); + this.cronsByWorkflow.delete(workflowId); this.logger.info('Deregistered all crons for workflow', { workflowId, - crons, + crons: summaries, instanceRole: this.instanceSettings.instanceRole, }); } deregisterAllCrons() { - for (const workflowId of this.cronMap.keys()) { + for (const workflowId of this.cronsByWorkflow.keys()) { this.deregisterCrons(workflowId); } - if (this.logInterval) clearInterval(this.logInterval); + clearInterval(this.logInterval); + this.logInterval = undefined; + } + + private toCronKey(ctx: CronContext): CronKey { + const { recurrence, ...rest } = ctx; + const flattened: Record = !recurrence + ? rest + : { + ...rest, + recurrenceActivated: recurrence.activated, + ...(recurrence.activated && { + recurrenceIndex: recurrence.index, + recurrenceIntervalSize: recurrence.intervalSize, + recurrenceTypeInterval: recurrence.typeInterval, + }), + }; + + const sorted = Object.keys(flattened) + .sort() + .reduce>((result, key) => { + result[key] = flattened[key]; + return result; + }, {}); + + return JSON.stringify(sorted); } } diff --git a/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts b/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts index 4d5cea4f15..1367f925c9 100644 --- a/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts +++ b/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts @@ -5,6 +5,7 @@ import type { INodeType, INodeTypeDescription, ITriggerResponse, + Cron, } from 'n8n-workflow'; import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; @@ -451,7 +452,10 @@ export class ScheduleTrigger implements INodeType { if (this.getMode() !== 'manual') { for (const { interval, cronExpression, recurrence } of rules) { try { - const cron = { expression: cronExpression, recurrence }; + const cron: Cron = { + expression: cronExpression, + recurrence, + }; this.helpers.registerCron(cron, () => executeTrigger(recurrence)); } catch (error) { if (interval.field === 'cronExpression') { diff --git a/packages/nodes-base/test/nodes/TriggerHelpers.ts b/packages/nodes-base/test/nodes/TriggerHelpers.ts index ee52afbf99..0c1a6b7304 100644 --- a/packages/nodes-base/test/nodes/TriggerHelpers.ts +++ b/packages/nodes-base/test/nodes/TriggerHelpers.ts @@ -22,6 +22,8 @@ import { type NodeTypeAndVersion, type VersionedNodeType, type Workflow, + type CronContext, + type Cron, } from 'n8n-workflow'; const logger = mock({ @@ -85,12 +87,21 @@ export async function testTriggerNode( mock(), logger as any, mock(), + mock(), ); const helpers = mock({ createDeferredPromise, returnJsonArray, - registerCron: (cronExpression, onTick) => - scheduledTaskManager.registerCron(workflow, cronExpression, onTick), + registerCron: (cron: Cron, onTick) => { + const ctx: CronContext = { + expression: cron.expression, + recurrence: cron.recurrence, + nodeId: node.id, + workflowId: workflow.id, + timezone: workflow.timezone, + }; + scheduledTaskManager.registerCron(ctx, onTick); + }, }); const triggerFunctions = mock({ @@ -149,11 +160,20 @@ export async function testWebhookTriggerNode( mock(), logger as any, mock(), + mock(), ); const helpers = mock({ returnJsonArray, - registerCron: (cronExpression, onTick) => - scheduledTaskManager.registerCron(workflow, cronExpression, onTick), + registerCron: (cron: Cron, onTick) => { + const ctx: CronContext = { + expression: cron.expression, + recurrence: cron.recurrence, + nodeId: node.id, + workflowId: workflow.id, + timezone: workflow.timezone, + }; + scheduledTaskManager.registerCron(ctx, onTick); + }, prepareBinaryData: options.helpers?.prepareBinaryData ?? jest.fn(), }); diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 2f9d510636..1c78b1a45d 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -843,7 +843,7 @@ type CronUnit = number | '*' | `*/${number}`; export type CronExpression = `${CronUnit} ${CronUnit} ${CronUnit} ${CronUnit} ${CronUnit} ${CronUnit}`; -type RecurrenceRule = +type CronRecurrenceRule = | { activated: false } | { activated: true; @@ -852,7 +852,15 @@ type RecurrenceRule = typeInterval: 'hours' | 'days' | 'weeks' | 'months'; }; -export type Cron = { expression: CronExpression; recurrence?: RecurrenceRule }; +export type CronContext = { + nodeId: string; + workflowId: string; + timezone: string; + expression: CronExpression; + recurrence?: CronRecurrenceRule; +}; + +export type Cron = { expression: CronExpression; recurrence?: CronRecurrenceRule }; export interface SchedulingFunctions { registerCron(cron: Cron, onTick: () => void): void;