mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
refactor(core): Port timeout config (#18722)
This commit is contained in:
@@ -39,6 +39,18 @@ class QueueRecoveryConfig {
|
||||
|
||||
@Config
|
||||
export class ExecutionsConfig {
|
||||
/**
|
||||
* How long (seconds) a workflow execution may run for before timeout.
|
||||
* On timeout, the execution will be forcefully stopped. `-1` for unlimited.
|
||||
* Currently unlimited by default - this default will change in a future version.
|
||||
*/
|
||||
@Env('EXECUTIONS_TIMEOUT')
|
||||
timeout: number = -1;
|
||||
|
||||
/** How long (seconds) a workflow execution may run for at most. */
|
||||
@Env('EXECUTIONS_TIMEOUT_MAX')
|
||||
maxTimeout: number = 3600; // 1h
|
||||
|
||||
/** Whether to delete past executions on a rolling basis. */
|
||||
@Env('EXECUTIONS_DATA_PRUNE')
|
||||
pruneData: boolean = true;
|
||||
|
||||
@@ -302,6 +302,8 @@ describe('GlobalConfig', () => {
|
||||
disableWebhookHtmlSandboxing: false,
|
||||
},
|
||||
executions: {
|
||||
timeout: -1,
|
||||
maxTimeout: 3600,
|
||||
pruneData: true,
|
||||
pruneDataMaxAge: 336,
|
||||
pruneDataMaxCount: 10_000,
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { testDb, createWorkflow, mockInstance } from '@n8n/backend-test-utils';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import type { User, ExecutionEntity } from '@n8n/db';
|
||||
import { Container, Service } from '@n8n/di';
|
||||
import { createExecution } from '@test-integration/db/executions';
|
||||
import { createUser } from '@test-integration/db/users';
|
||||
import { setupTestServer } from '@test-integration/utils';
|
||||
import type { Response } from 'express';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import { DirectedGraph, WorkflowExecute } from 'n8n-core';
|
||||
@@ -30,9 +34,6 @@ import { ManualExecutionService } from '@/manual-execution.service';
|
||||
import { Telemetry } from '@/telemetry';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import { WorkflowRunner } from '@/workflow-runner';
|
||||
import { createExecution } from '@test-integration/db/executions';
|
||||
import { createUser } from '@test-integration/db/users';
|
||||
import { setupTestServer } from '@test-integration/utils';
|
||||
|
||||
let owner: User;
|
||||
let runner: WorkflowRunner;
|
||||
@@ -333,7 +334,7 @@ describe('workflow timeout with startedAt', () => {
|
||||
const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution');
|
||||
|
||||
// Mock config to return a workflow timeout of 10 seconds
|
||||
jest.spyOn(config, 'getEnv').mockReturnValue(10);
|
||||
Container.get(GlobalConfig).executions.timeout = 10;
|
||||
|
||||
const startedAt = new Date(Date.now() - 5000); // 5 seconds ago
|
||||
const data = mock<IWorkflowExecutionDataProcess>({
|
||||
@@ -389,7 +390,7 @@ describe('workflow timeout with startedAt', () => {
|
||||
const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution');
|
||||
|
||||
// Mock config to return a workflow timeout of 10 seconds
|
||||
jest.spyOn(config, 'getEnv').mockReturnValue(10);
|
||||
Container.get(GlobalConfig).executions.timeout = 10;
|
||||
|
||||
const startedAt = new Date(Date.now() - 15000); // 15 seconds ago (timeout already elapsed)
|
||||
const data = mock<IWorkflowExecutionDataProcess>({
|
||||
@@ -436,7 +437,7 @@ describe('workflow timeout with startedAt', () => {
|
||||
const mockStopExecution = jest.spyOn(activeExecutions, 'stopExecution');
|
||||
|
||||
// Mock config to return a workflow timeout of 10 seconds
|
||||
jest.spyOn(config, 'getEnv').mockReturnValue(10);
|
||||
Container.get(GlobalConfig).executions.timeout = 10;
|
||||
|
||||
const data = mock<IWorkflowExecutionDataProcess>({
|
||||
workflowData: {
|
||||
|
||||
@@ -9,30 +9,6 @@ export const schema = {
|
||||
default: 'regular',
|
||||
env: 'EXECUTIONS_MODE',
|
||||
},
|
||||
|
||||
// A Workflow times out and gets canceled after this time (seconds).
|
||||
// If the workflow is executed in the main process a soft timeout
|
||||
// is executed (takes effect after the current node finishes).
|
||||
// If a workflow is running in its own process is a soft timeout
|
||||
// tried first, before killing the process after waiting for an
|
||||
// additional fifth of the given timeout duration.
|
||||
//
|
||||
// To deactivate timeout set it to -1
|
||||
//
|
||||
// Timeout is currently not activated by default which will change
|
||||
// in a future version.
|
||||
timeout: {
|
||||
doc: 'Max run time (seconds) before stopping the workflow execution',
|
||||
format: Number,
|
||||
default: -1,
|
||||
env: 'EXECUTIONS_TIMEOUT',
|
||||
},
|
||||
maxTimeout: {
|
||||
doc: 'Max execution time (seconds) that can be set for a workflow individually',
|
||||
format: Number,
|
||||
default: 3600,
|
||||
env: 'EXECUTIONS_TIMEOUT_MAX',
|
||||
},
|
||||
},
|
||||
|
||||
userManagement: {
|
||||
|
||||
@@ -829,8 +829,8 @@ export class TelemetryEventRelay extends EventRelay {
|
||||
},
|
||||
execution_variables: {
|
||||
executions_mode: config.getEnv('executions.mode'),
|
||||
executions_timeout: config.getEnv('executions.timeout'),
|
||||
executions_timeout_max: config.getEnv('executions.maxTimeout'),
|
||||
executions_timeout: this.globalConfig.executions.timeout,
|
||||
executions_timeout_max: this.globalConfig.executions.maxTimeout,
|
||||
executions_data_save_on_error: this.globalConfig.executions.saveDataOnError,
|
||||
executions_data_save_on_success: this.globalConfig.executions.saveDataOnSuccess,
|
||||
executions_data_save_on_progress: this.globalConfig.executions.saveExecutionProgress,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { Logger } from '@n8n/backend-common';
|
||||
import type { ExecutionsConfig } from '@n8n/config';
|
||||
import type { IExecutionResponse, ExecutionRepository } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { WorkflowExecute as ActualWorkflowExecute } from 'n8n-core';
|
||||
@@ -13,18 +14,18 @@ import {
|
||||
type WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { JobProcessor } from '../job-processor';
|
||||
import type { Job } from '../scaling.types';
|
||||
|
||||
import { CredentialsHelper } from '@/credentials-helper';
|
||||
import { VariablesService } from '@/environments.ee/variables/variables.service.ee';
|
||||
import { ExternalHooks } from '@/external-hooks';
|
||||
import type { ManualExecutionService } from '@/manual-execution.service';
|
||||
import { DataStoreProxyService } from '@/modules/data-table/data-store-proxy.service';
|
||||
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
|
||||
|
||||
import { JobProcessor } from '../job-processor';
|
||||
import type { Job } from '../scaling.types';
|
||||
import { DataStoreProxyService } from '@/modules/data-table/data-store-proxy.service';
|
||||
|
||||
mockInstance(VariablesService, {
|
||||
getAllCached: jest.fn().mockResolvedValue([]),
|
||||
});
|
||||
@@ -52,6 +53,11 @@ const logger = mock<Logger>({
|
||||
scoped: jest.fn().mockImplementation(() => logger),
|
||||
});
|
||||
|
||||
const executionsConfig = mock<ExecutionsConfig>({
|
||||
timeout: -1,
|
||||
maxTimeout: 3600,
|
||||
});
|
||||
|
||||
describe('JobProcessor', () => {
|
||||
it('should refrain from processing a crashed execution', async () => {
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
@@ -65,6 +71,7 @@ describe('JobProcessor', () => {
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
executionsConfig,
|
||||
);
|
||||
|
||||
const result = await jobProcessor.processJob(mock<Job>());
|
||||
@@ -94,6 +101,7 @@ describe('JobProcessor', () => {
|
||||
mock(),
|
||||
mock(),
|
||||
manualExecutionService,
|
||||
executionsConfig,
|
||||
);
|
||||
|
||||
await jobProcessor.processJob(mock<Job>());
|
||||
@@ -132,6 +140,7 @@ describe('JobProcessor', () => {
|
||||
mock(),
|
||||
mock(),
|
||||
manualExecutionService,
|
||||
executionsConfig,
|
||||
);
|
||||
|
||||
const executionId = 'execution-id';
|
||||
@@ -194,6 +203,7 @@ describe('JobProcessor', () => {
|
||||
mock(),
|
||||
mock(),
|
||||
manualExecutionService,
|
||||
executionsConfig,
|
||||
);
|
||||
|
||||
await jobProcessor.processJob(mock<Job>());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { RunningJobSummary } from '@n8n/api-types';
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { ExecutionsConfig } from '@n8n/config';
|
||||
import { ExecutionRepository, WorkflowRepository } from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import { WorkflowHasIssuesError, InstanceSettings, WorkflowExecute } from 'n8n-core';
|
||||
@@ -13,7 +14,6 @@ import type {
|
||||
import { BINARY_ENCODING, Workflow, UnexpectedError } from 'n8n-workflow';
|
||||
import type PCancelable from 'p-cancelable';
|
||||
|
||||
import config from '@/config';
|
||||
import { getLifecycleHooksForScalingWorker } from '@/execution-lifecycle/execution-lifecycle-hooks';
|
||||
import { ManualExecutionService } from '@/manual-execution.service';
|
||||
import { NodeTypes } from '@/node-types';
|
||||
@@ -43,6 +43,7 @@ export class JobProcessor {
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly manualExecutionService: ManualExecutionService,
|
||||
private readonly executionsConfig: ExecutionsConfig,
|
||||
) {
|
||||
this.logger = this.logger.scoped('scaling');
|
||||
}
|
||||
@@ -97,12 +98,12 @@ export class JobProcessor {
|
||||
|
||||
const workflowSettings = execution.workflowData.settings ?? {};
|
||||
|
||||
let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout');
|
||||
let workflowTimeout = workflowSettings.executionTimeout ?? this.executionsConfig.timeout;
|
||||
|
||||
let executionTimeoutTimestamp: number | undefined;
|
||||
|
||||
if (workflowTimeout > 0) {
|
||||
workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout'));
|
||||
workflowTimeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout);
|
||||
executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000;
|
||||
}
|
||||
|
||||
|
||||
@@ -121,8 +121,8 @@ export class FrontendService {
|
||||
saveDataSuccessExecution: this.globalConfig.executions.saveDataOnSuccess,
|
||||
saveManualExecutions: this.globalConfig.executions.saveDataManualExecutions,
|
||||
saveExecutionProgress: this.globalConfig.executions.saveExecutionProgress,
|
||||
executionTimeout: config.getEnv('executions.timeout'),
|
||||
maxExecutionTimeout: config.getEnv('executions.maxTimeout'),
|
||||
executionTimeout: this.globalConfig.executions.timeout,
|
||||
maxExecutionTimeout: this.globalConfig.executions.maxTimeout,
|
||||
workflowCallerPolicyDefaultOption: this.globalConfig.workflows.callerPolicyDefaultOption,
|
||||
timezone: this.globalConfig.generic.timezone,
|
||||
urlBaseWebhook: this.urlService.getWebhookBaseUrl(),
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { ExecutionsConfig } from '@n8n/config';
|
||||
import { ExecutionRepository } from '@n8n/db';
|
||||
import { Container, Service } from '@n8n/di';
|
||||
import type { ExecutionLifecycleHooks } from 'n8n-core';
|
||||
@@ -58,6 +59,7 @@ export class WorkflowRunner {
|
||||
private readonly manualExecutionService: ManualExecutionService,
|
||||
private readonly executionDataService: ExecutionDataService,
|
||||
private readonly eventService: EventService,
|
||||
private readonly executionsConfig: ExecutionsConfig,
|
||||
) {}
|
||||
|
||||
setExecutionMode(mode: 'regular' | 'queue') {
|
||||
@@ -219,9 +221,9 @@ export class WorkflowRunner {
|
||||
let executionTimeout: NodeJS.Timeout;
|
||||
|
||||
const workflowSettings = data.workflowData.settings ?? {};
|
||||
let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default
|
||||
let workflowTimeout = workflowSettings.executionTimeout ?? this.executionsConfig.timeout; // initialize with default
|
||||
if (workflowTimeout > 0) {
|
||||
workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout'));
|
||||
workflowTimeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout);
|
||||
}
|
||||
|
||||
let pinData: IPinData | undefined;
|
||||
@@ -305,7 +307,7 @@ export class WorkflowRunner {
|
||||
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
|
||||
|
||||
if (workflowTimeout > 0) {
|
||||
let timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as milliseconds
|
||||
let timeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout) * 1000; // as milliseconds
|
||||
if (data.startedAt && data.startedAt instanceof Date) {
|
||||
// If startedAt is set, we calculate the timeout based on the startedAt time
|
||||
// This is useful for executions that were waiting in a waiting state
|
||||
|
||||
@@ -23,7 +23,6 @@ import { NodeApiError, PROJECT_ROOT } from 'n8n-workflow';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
||||
import config from '@/config';
|
||||
import { FolderNotFoundError } from '@/errors/folder-not-found.error';
|
||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
||||
@@ -278,7 +277,7 @@ export class WorkflowService {
|
||||
}
|
||||
}
|
||||
|
||||
if (workflowSettings.executionTimeout === config.get('executions.timeout')) {
|
||||
if (workflowSettings.executionTimeout === this.globalConfig.executions.timeout) {
|
||||
// Do not save when default got set
|
||||
delete workflowSettings.executionTimeout;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user