diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index 8bbac021b0..3b8e272b9c 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -19,10 +19,22 @@ export const LOG_SCOPES = [ 'insights', 'workflow-activation', 'ssh-client', + 'cron', ] as const; export type LogScope = (typeof LOG_SCOPES)[number]; +@Config +export class CronLoggingConfig { + /** + * Interval in minutes to log currently active cron jobs. Set to `0` to disable. + * + * @example `N8N_LOG_CRON_ACTIVE_INTERVAL=30` will log active crons every 30 minutes. + */ + @Env('N8N_LOG_CRON_ACTIVE_INTERVAL') + activeInterval: number = 0; +} + @Config class FileLoggingConfig { /** @@ -79,6 +91,9 @@ export class LoggingConfig { @Nested file: FileLoggingConfig; + @Nested + cron: CronLoggingConfig; + /** * Scopes to filter logs by. Nothing is filtered by default. * diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index b6b8cdf85c..31b73bb095 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -51,6 +51,7 @@ export { MfaConfig } from './configs/mfa.config'; export { HiringBannerConfig } from './configs/hiring-banner.config'; export { PersonalizationConfig } from './configs/personalization.config'; export { NodesConfig } from './configs/nodes.config'; +export { CronLoggingConfig } from './configs/logging.config'; const protocolSchema = z.enum(['http', 'https']); diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 806f36532b..379e684443 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -275,6 +275,9 @@ describe('GlobalConfig', () => { location: 'logs/n8n.log', }, scopes: [], + cron: { + activeInterval: 0, + }, }, multiMainSetup: { enabled: false, diff --git a/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts b/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts index 4c39d3afbc..f3836a9376 100644 --- a/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts +++ b/packages/core/src/execution-engine/__tests__/scheduled-task-manager.test.ts @@ -1,3 +1,4 @@ +import type { Logger } from '@n8n/backend-common'; import { mock } from 'jest-mock-extended'; import type { Workflow } from 'n8n-workflow'; @@ -5,6 +6,8 @@ import type { InstanceSettings } from '@/instance-settings'; import { ScheduledTaskManager } from '../scheduled-task-manager'; +const logger = mock({ scoped: jest.fn().mockReturnValue(mock()) }); + describe('ScheduledTaskManager', () => { const instanceSettings = mock({ isLeader: true }); const workflow = mock({ timezone: 'GMT' }); @@ -16,14 +19,14 @@ describe('ScheduledTaskManager', () => { beforeEach(() => { jest.clearAllMocks(); jest.useFakeTimers(); - scheduledTaskManager = new ScheduledTaskManager(instanceSettings); + scheduledTaskManager = new ScheduledTaskManager(instanceSettings, logger, mock()); }); it('should throw when workflow timezone is invalid', () => { expect(() => scheduledTaskManager.registerCron( mock({ timezone: 'somewhere' }), - everyMinute, + { expression: everyMinute }, onTick, ), ).toThrow('Invalid timezone.'); @@ -36,17 +39,21 @@ describe('ScheduledTaskManager', () => { ).toThrow(); }); - it('should register valid CronJobs', async () => { - scheduledTaskManager.registerCron(workflow, everyMinute, onTick); + it('should register valid CronJobs', () => { + scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); expect(onTick).not.toHaveBeenCalled(); jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes expect(onTick).toHaveBeenCalledTimes(10); }); - it('should should not invoke on follower instances', async () => { - scheduledTaskManager = new ScheduledTaskManager(mock({ isLeader: false })); - scheduledTaskManager.registerCron(workflow, everyMinute, onTick); + it('should not invoke on follower instances', () => { + scheduledTaskManager = new ScheduledTaskManager( + mock({ isLeader: false }), + logger, + mock(), + ); + scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); expect(onTick).not.toHaveBeenCalled(); jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes @@ -54,18 +61,26 @@ describe('ScheduledTaskManager', () => { }); it('should deregister CronJobs for a workflow', async () => { - scheduledTaskManager.registerCron(workflow, everyMinute, onTick); - scheduledTaskManager.registerCron(workflow, everyMinute, onTick); - scheduledTaskManager.registerCron(workflow, everyMinute, onTick); + scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); + scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); + scheduledTaskManager.registerCron(workflow, { expression: everyMinute }, onTick); - expect(scheduledTaskManager.cronJobs.get(workflow.id)?.length).toBe(3); + expect(scheduledTaskManager.cronMap.get(workflow.id)).toHaveLength(3); scheduledTaskManager.deregisterCrons(workflow.id); - expect(scheduledTaskManager.cronJobs.get(workflow.id)?.length).toBe(0); + expect(scheduledTaskManager.cronMap.get(workflow.id)).toBeUndefined(); expect(onTick).not.toHaveBeenCalled(); jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes expect(onTick).not.toHaveBeenCalled(); }); + + it('should not set up log interval when activeInterval is 0', () => { + const configWithZeroInterval = mock({ activeInterval: 0 }); + const manager = new ScheduledTaskManager(instanceSettings, logger, configWithZeroInterval); + + // @ts-expect-error Private property + expect(manager.logInterval).toBeUndefined(); + }); }); diff --git a/packages/core/src/execution-engine/active-workflows.ts b/packages/core/src/execution-engine/active-workflows.ts index 2f4db747bb..7fd1e6a0e9 100644 --- a/packages/core/src/execution-engine/active-workflows.ts +++ b/packages/core/src/execution-engine/active-workflows.ts @@ -149,7 +149,7 @@ export class ActiveWorkflows { }; // Get all the trigger times - const cronTimes = (pollTimes.item || []).map(toCronExpression); + const cronExpressions = (pollTimes.item || []).map(toCronExpression); // The trigger function to execute when the cron-time got reached const executeTrigger = async (testingTrigger = false) => { this.logger.debug(`Polling trigger initiated for workflow "${workflow.name}"`, { @@ -177,15 +177,15 @@ export class ActiveWorkflows { // Execute the trigger directly to be able to know if it works await executeTrigger(true); - for (const cronTime of cronTimes) { - const cronTimeParts = cronTime.split(' '); + for (const expression of cronExpressions) { + const cronTimeParts = expression.split(' '); if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) { throw new ApplicationError( 'The polling interval is too short. It has to be at least a minute.', ); } - this.scheduledTaskManager.registerCron(workflow, cronTime, executeTrigger); + this.scheduledTaskManager.registerCron(workflow, { expression }, executeTrigger); } } diff --git a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts index e9a5e63f58..5c65225b25 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/scheduling-helper-functions.test.ts @@ -19,11 +19,11 @@ describe('getSchedulingFunctions', () => { describe('registerCron', () => { it('should invoke scheduledTaskManager.registerCron', () => { - schedulingFunctions.registerCron(cronExpression, onTick); + schedulingFunctions.registerCron({ expression: cronExpression }, onTick); expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith( workflow, - cronExpression, + { expression: cronExpression }, onTick, ); }); diff --git a/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts b/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts index d2e0034dc0..64363f6eb0 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/scheduling-helper-functions.ts @@ -6,7 +6,6 @@ import { ScheduledTaskManager } from '../../scheduled-task-manager'; export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => { const scheduledTaskManager = Container.get(ScheduledTaskManager); return { - registerCron: (cronExpression, onTick) => - scheduledTaskManager.registerCron(workflow, cronExpression, onTick), + registerCron: (cron, onTick) => scheduledTaskManager.registerCron(workflow, cron, onTick), }; }; diff --git a/packages/core/src/execution-engine/scheduled-task-manager.ts b/packages/core/src/execution-engine/scheduled-task-manager.ts index 1a20700b00..1dade519b8 100644 --- a/packages/core/src/execution-engine/scheduled-task-manager.ts +++ b/packages/core/src/execution-engine/scheduled-task-manager.ts @@ -1,44 +1,115 @@ +import { Logger } from '@n8n/backend-common'; +import { CronLoggingConfig } from '@n8n/config'; +import { Time } from '@n8n/constants'; import { Service } from '@n8n/di'; import { CronJob } from 'cron'; -import type { CronExpression, Workflow } from 'n8n-workflow'; +import type { Cron, Workflow } from 'n8n-workflow'; import { InstanceSettings } from '@/instance-settings'; @Service() export class ScheduledTaskManager { - constructor(private readonly instanceSettings: InstanceSettings) {} + readonly cronMap = new Map>(); - readonly cronJobs = new Map(); + private readonly logInterval?: NodeJS.Timeout; + + constructor( + private readonly instanceSettings: InstanceSettings, + private readonly logger: Logger, + private readonly config: CronLoggingConfig, + ) { + this.logger = this.logger.scoped('cron'); + + if (this.config.activeInterval === 0) return; + + this.logInterval = setInterval( + () => this.logActiveCrons(), + this.config.activeInterval * Time.minutes.toMilliseconds, + ); + } + + private logActiveCrons() { + const activeCrons: Record = {}; + for (const [workflowId, cronJobs] of this.cronMap) { + activeCrons[`workflow-${workflowId}`] = cronJobs.map( + ({ displayableCron }) => displayableCron, + ); + } + + if (Object.keys(activeCrons).length === 0) return; + + this.logger.debug('Currently active crons', { activeCrons }); + } + + registerCron(workflow: Workflow, { expression, recurrence }: Cron, onTick: () => void) { + const recurrenceStr = recurrence?.activated + ? `every ${recurrence.intervalSize} ${recurrence.typeInterval}` + : undefined; + + const displayableCron = recurrenceStr ? `${expression} (${recurrenceStr})` : expression; - registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) { const cronJob = new CronJob( - cronExpression, + expression, () => { - if (this.instanceSettings.isLeader) onTick(); + if (this.instanceSettings.isLeader) { + this.logger.debug('Executing cron for workflow', { + workflowId: workflow.id, + cron: displayableCron, + instanceRole: this.instanceSettings.instanceRole, + }); + onTick(); + } }, undefined, true, workflow.timezone, ); - const cronJobsForWorkflow = this.cronJobs.get(workflow.id); - if (cronJobsForWorkflow) { - cronJobsForWorkflow.push(cronJob); + + const workflowCronEntries = this.cronMap.get(workflow.id); + const cronEntry = { job: cronJob, displayableCron }; + + if (workflowCronEntries) { + workflowCronEntries.push(cronEntry); } else { - this.cronJobs.set(workflow.id, [cronJob]); + this.cronMap.set(workflow.id, [cronEntry]); } + + this.logger.debug('Registered cron for workflow', { + workflowId: workflow.id, + cron: displayableCron, + instanceRole: this.instanceSettings.instanceRole, + }); } deregisterCrons(workflowId: string) { - const cronJobs = this.cronJobs.get(workflowId) ?? []; + const cronJobs = this.cronMap.get(workflowId) ?? []; + + if (cronJobs.length === 0) return; + + const crons: string[] = []; + while (cronJobs.length) { - const cronJob = cronJobs.pop(); - if (cronJob) cronJob.stop(); + const cronEntry = cronJobs.pop(); + if (cronEntry) { + crons.push(cronEntry.displayableCron); + cronEntry.job.stop(); + } } + + this.cronMap.delete(workflowId); + + this.logger.info('Deregistered all crons for workflow', { + workflowId, + crons, + instanceRole: this.instanceSettings.instanceRole, + }); } deregisterAllCrons() { - for (const workflowId of Object.keys(this.cronJobs)) { + for (const workflowId of this.cronMap.keys()) { this.deregisterCrons(workflowId); } + + if (this.logInterval) clearInterval(this.logInterval); } } diff --git a/packages/nodes-base/nodes/Cron/Cron.node.ts b/packages/nodes-base/nodes/Cron/Cron.node.ts index 3e85a4c0b2..b2fa21f259 100644 --- a/packages/nodes-base/nodes/Cron/Cron.node.ts +++ b/packages/nodes-base/nodes/Cron/Cron.node.ts @@ -56,7 +56,7 @@ export class Cron implements INodeType { }; // Get all the trigger times - const cronTimes = (triggerTimes.item || []).map(toCronExpression); + const expressions = (triggerTimes.item || []).map(toCronExpression); // The trigger function to execute when the cron-time got reached // or when manually triggered @@ -65,7 +65,7 @@ export class Cron implements INodeType { }; // Register the cron-jobs - cronTimes.forEach((cronTime) => this.helpers.registerCron(cronTime, executeTrigger)); + expressions.forEach((expression) => this.helpers.registerCron({ expression }, executeTrigger)); return { manualTriggerFunction: async () => executeTrigger(), diff --git a/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts b/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts index e4d70536ae..4d5cea4f15 100644 --- a/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts +++ b/packages/nodes-base/nodes/Schedule/ScheduleTrigger.node.ts @@ -451,7 +451,8 @@ export class ScheduleTrigger implements INodeType { if (this.getMode() !== 'manual') { for (const { interval, cronExpression, recurrence } of rules) { try { - this.helpers.registerCron(cronExpression, () => executeTrigger(recurrence)); + const cron = { expression: cronExpression, recurrence }; + this.helpers.registerCron(cron, () => executeTrigger(recurrence)); } catch (error) { if (interval.field === 'cronExpression') { throw new NodeOperationError(this.getNode(), 'Invalid cron expression', { diff --git a/packages/nodes-base/test/nodes/TriggerHelpers.ts b/packages/nodes-base/test/nodes/TriggerHelpers.ts index 91fbd9aae5..ee52afbf99 100644 --- a/packages/nodes-base/test/nodes/TriggerHelpers.ts +++ b/packages/nodes-base/test/nodes/TriggerHelpers.ts @@ -24,6 +24,17 @@ import { type Workflow, } from 'n8n-workflow'; +const logger = mock({ + scoped: jest.fn().mockReturnValue( + mock({ + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }), + ), +}); + type MockDeepPartial = Parameters>[0]; type TestTriggerNodeOptions = { @@ -70,7 +81,11 @@ export async function testTriggerNode( ) as INode; const workflow = mock({ timezone: options.timezone ?? 'Europe/Berlin' }); - const scheduledTaskManager = new ScheduledTaskManager(mock()); + const scheduledTaskManager = new ScheduledTaskManager( + mock(), + logger as any, + mock(), + ); const helpers = mock({ createDeferredPromise, returnJsonArray, @@ -130,7 +145,11 @@ export async function testWebhookTriggerNode( ) as INode; const workflow = mock({ timezone: options.timezone ?? 'Europe/Berlin' }); - const scheduledTaskManager = new ScheduledTaskManager(mock()); + const scheduledTaskManager = new ScheduledTaskManager( + mock(), + logger as any, + mock(), + ); const helpers = mock({ returnJsonArray, registerCron: (cronExpression, onTick) => diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 33e055fbff..a0eef7e6a7 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -843,8 +843,19 @@ type CronUnit = number | '*' | `*/${number}`; export type CronExpression = `${CronUnit} ${CronUnit} ${CronUnit} ${CronUnit} ${CronUnit} ${CronUnit}`; +type RecurrenceRule = + | { activated: false } + | { + activated: true; + index: number; + intervalSize: number; + typeInterval: 'hours' | 'days' | 'weeks' | 'months'; + }; + +export type Cron = { expression: CronExpression; recurrence?: RecurrenceRule }; + export interface SchedulingFunctions { - registerCron(cronExpression: CronExpression, onTick: () => void): void; + registerCron(cron: Cron, onTick: () => void): void; } export type NodeTypeAndVersion = {