refactor(core): Centralize CronJob management (#10033)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-07-16 20:42:48 +02:00
committed by GitHub
parent 36b314d031
commit 09f2cf9eaf
18 changed files with 730 additions and 429 deletions

View File

@@ -1,11 +1,9 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';
import type {
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
TriggerTime,
@@ -23,10 +21,13 @@ import {
WorkflowDeactivationError,
} from 'n8n-workflow';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import type { IWorkflowData } from './Interfaces';
@Service()
export class ActiveWorkflows {
constructor(private readonly scheduledTaskManager: ScheduledTaskManager) {}
private activeWorkflows: { [workflowId: string]: IWorkflowData } = {};
/**
@@ -102,20 +103,15 @@ export class ActiveWorkflows {
if (pollingNodes.length === 0) return;
this.activeWorkflows[workflowId].pollResponses = [];
for (const pollNode of pollingNodes) {
try {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.activeWorkflows[workflowId].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
@@ -138,7 +134,7 @@ export class ActiveWorkflows {
getPollFunctions: IGetExecutePollFunctions,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<IPollResponse> {
): Promise<void> {
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode, activation);
const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
@@ -161,7 +157,7 @@ export class ActiveWorkflows {
pollFunctions.__emit(pollResponse);
}
} catch (error) {
// If the poll function failes in the first activation
// If the poll function fails in the first activation
// throw the error back so we let the user know there is
// an issue with the trigger.
if (testingTrigger) {
@@ -174,11 +170,6 @@ export class ActiveWorkflows {
// Execute the trigger directly to be able to know if it works
await executeTrigger(true);
const timezone = pollFunctions.getTimezone();
// Start the cron-jobs
const cronJobs: CronJob[] = [];
for (const cronTime of cronTimes) {
const cronTimeParts = cronTime.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
@@ -187,19 +178,8 @@ export class ActiveWorkflows {
);
}
cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone));
this.scheduledTaskManager.registerCron(workflow, cronTime, executeTrigger);
}
// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
}
return {
closeFunction,
};
}
/**
@@ -211,14 +191,11 @@ export class ActiveWorkflows {
return false;
}
this.scheduledTaskManager.deregisterCrons(workflowId);
const w = this.activeWorkflows[workflowId];
for (const r of w.triggerResponses ?? []) {
await this.close(r, workflowId, 'trigger');
}
for (const r of w.pollResponses ?? []) {
await this.close(r, workflowId, 'poller');
await this.closeTrigger(r, workflowId);
}
delete this.activeWorkflows[workflowId];
@@ -232,11 +209,7 @@ export class ActiveWorkflows {
}
}
private async close(
response: ITriggerResponse | IPollResponse,
workflowId: string,
target: 'trigger' | 'poller',
) {
private async closeTrigger(response: ITriggerResponse, workflowId: string) {
if (!response.closeFunction) return;
try {
@@ -246,14 +219,14 @@ export class ActiveWorkflows {
Logger.error(
`There was a problem calling "closeFunction" on "${e.node.name}" in workflow "${workflowId}"`,
);
ErrorReporter.error(e, { extra: { target, workflowId } });
ErrorReporter.error(e, { extra: { workflowId } });
return;
}
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowDeactivationError(
`Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`,
`Failed to deactivate trigger of workflow ID "${workflowId}": "${error.message}"`,
{ cause: error, workflowId },
);
}

View File

@@ -1,5 +1,4 @@
import type {
IPollResponse,
ITriggerResponse,
IWorkflowSettings as IWorkflowSettingsWorkflow,
ValidationResult,
@@ -18,7 +17,6 @@ export interface IWorkflowSettings extends IWorkflowSettingsWorkflow {
}
export interface IWorkflowData {
pollResponses?: IPollResponse[];
triggerResponses?: ITriggerResponse[];
}

View File

@@ -102,6 +102,7 @@ import type {
INodeParameters,
EnsureTypeOptions,
SSHTunnelFunctions,
SchedulingFunctions,
} from 'n8n-workflow';
import {
ExpressionError,
@@ -114,7 +115,6 @@ import {
createDeferredPromise,
deepCopy,
fileTypeFromMimeType,
getGlobalState,
isObjectEmpty,
isResourceMapperValue,
validateFieldType,
@@ -157,6 +157,7 @@ import Container from 'typedi';
import type { BinaryData } from './BinaryData/types';
import merge from 'lodash/merge';
import { InstanceSettings } from './InstanceSettings';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { SSHClientsManager } from './SSHClientsManager';
import { binaryToBuffer } from './BinaryData/utils';
@@ -2585,13 +2586,6 @@ export function getNodeWebhookUrl(
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id, node, path.toString(), isFullPath);
}
/**
* Returns the timezone for the workflow
*/
export function getTimezone(workflow: Workflow): string {
return workflow.settings.timezone ?? getGlobalState().defaultTimezone;
}
/**
* Returns the full webhook description of the webhook with the given name
*
@@ -2957,7 +2951,7 @@ const getCommonWorkflowFunctions = (
getRestApiUrl: () => additionalData.restApiUrl,
getInstanceBaseUrl: () => additionalData.instanceBaseUrl,
getInstanceId: () => Container.get(InstanceSettings).instanceId,
getTimezone: () => getTimezone(workflow),
getTimezone: () => workflow.timezone,
getCredentialsProperties: (type: string) =>
additionalData.credentialsHelper.getCredentialsProperties(type),
prepareOutputData: async (outputData) => [outputData],
@@ -3286,6 +3280,14 @@ const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
await Container.get(SSHClientsManager).getClient(credentials),
});
const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cronExpression, onTick) =>
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
};
};
const getAllowedPaths = () => {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
if (!restrictFileAccessTo) {
@@ -3489,6 +3491,7 @@ export function getExecutePollFunctions(
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
@@ -3553,6 +3556,7 @@ export function getExecuteTriggerFunctions(
...getSSHTunnelFunctions(),
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};

View File

@@ -0,0 +1,31 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';
import type { CronExpression, Workflow } from 'n8n-workflow';
@Service()
export class ScheduledTaskManager {
readonly cronJobs = new Map<string, CronJob[]>();
registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
const cronJob = new CronJob(cronExpression, onTick, undefined, true, workflow.timezone);
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
if (cronJobsForWorkflow) {
cronJobsForWorkflow.push(cronJob);
} else {
this.cronJobs.set(workflow.id, [cronJob]);
}
}
deregisterCrons(workflowId: string) {
const cronJobs = this.cronJobs.get(workflowId) ?? [];
for (const cronJob of cronJobs) {
cronJob.stop();
}
}
deregisterAllCrons() {
for (const workflowId of Object.keys(this.cronJobs)) {
this.deregisterCrons(workflowId);
}
}
}