From 516f3b7b4bf9bb686a3f7aa378ac63b79ba0fe95 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Mon, 9 Dec 2024 13:11:29 +0200 Subject: [PATCH] feat(core): Detect restart loop in a task runner process (no-changelog) (#12003) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- ...nner-process-restart-loop-detector.test.ts | 57 +++++++++++++++ .../errors/task-runner-restart-loop-error.ts | 14 ++++ .../cli/src/runners/task-runner-module.ts | 30 +++++++- ...sk-runner-process-restart-loop-detector.ts | 73 +++++++++++++++++++ .../task-runner-module.external.test.ts | 3 +- .../runners/task-runner-process.test.ts | 30 ++++++++ 6 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 packages/cli/src/runners/__tests__/task-runner-process-restart-loop-detector.test.ts create mode 100644 packages/cli/src/runners/errors/task-runner-restart-loop-error.ts create mode 100644 packages/cli/src/runners/task-runner-process-restart-loop-detector.ts diff --git a/packages/cli/src/runners/__tests__/task-runner-process-restart-loop-detector.test.ts b/packages/cli/src/runners/__tests__/task-runner-process-restart-loop-detector.test.ts new file mode 100644 index 0000000000..61cfb8b8e8 --- /dev/null +++ b/packages/cli/src/runners/__tests__/task-runner-process-restart-loop-detector.test.ts @@ -0,0 +1,57 @@ +import { TaskRunnersConfig } from '@n8n/config'; +import { mock } from 'jest-mock-extended'; + +import type { Logger } from '@/logging/logger.service'; +import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.service'; +import { TaskRunnerRestartLoopError } from '@/runners/errors/task-runner-restart-loop-error'; +import { RunnerLifecycleEvents } from '@/runners/runner-lifecycle-events'; +import { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { TaskRunnerProcessRestartLoopDetector } from '@/runners/task-runner-process-restart-loop-detector'; + +describe('TaskRunnerProcessRestartLoopDetector', () => { + const mockLogger = mock(); + const mockAuthService = mock(); + const runnerConfig = new TaskRunnersConfig(); + const taskRunnerProcess = new TaskRunnerProcess( + mockLogger, + runnerConfig, + mockAuthService, + new RunnerLifecycleEvents(), + ); + + it('should detect a restart loop if process exits 5 times within 5s', () => { + const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(taskRunnerProcess); + let emittedError: TaskRunnerRestartLoopError | undefined = undefined; + restartLoopDetector.on('restart-loop-detected', (error) => { + emittedError = error; + }); + + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + + expect(emittedError).toBeInstanceOf(TaskRunnerRestartLoopError); + }); + + it('should not detect a restart loop if process exits less than 5 times within 5s', () => { + jest.useFakeTimers(); + const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(taskRunnerProcess); + let emittedError: TaskRunnerRestartLoopError | undefined = undefined; + restartLoopDetector.on('restart-loop-detected', (error) => { + emittedError = error; + }); + + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + taskRunnerProcess.emit('exit'); + + jest.advanceTimersByTime(5010); + + taskRunnerProcess.emit('exit'); + + expect(emittedError).toBeUndefined(); + }); +}); diff --git a/packages/cli/src/runners/errors/task-runner-restart-loop-error.ts b/packages/cli/src/runners/errors/task-runner-restart-loop-error.ts new file mode 100644 index 0000000000..b788d83808 --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-restart-loop-error.ts @@ -0,0 +1,14 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskRunnerRestartLoopError extends ApplicationError { + constructor( + public readonly howManyTimes: number, + public readonly timePeriodMs: number, + ) { + const message = `Task runner has restarted ${howManyTimes} times within ${timePeriodMs / 1000} seconds. This is an abnormally high restart rate that suggests a bug or other issue is preventing your runner process from starting up. If this issues persists, please file a report at: https://github.com/n8n-io/n8n/issues`; + + super(message, { + level: 'fatal', + }); + } +} diff --git a/packages/cli/src/runners/task-runner-module.ts b/packages/cli/src/runners/task-runner-module.ts index 1502dd1f07..97631e0763 100644 --- a/packages/cli/src/runners/task-runner-module.ts +++ b/packages/cli/src/runners/task-runner-module.ts @@ -1,9 +1,13 @@ import { TaskRunnersConfig } from '@n8n/config'; +import { ErrorReporterProxy, sleep } from 'n8n-workflow'; import * as a from 'node:assert/strict'; import Container, { Service } from 'typedi'; import { OnShutdown } from '@/decorators/on-shutdown'; +import { Logger } from '@/logging/logger.service'; +import type { TaskRunnerRestartLoopError } from '@/runners/errors/task-runner-restart-loop-error'; import type { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { TaskRunnerProcessRestartLoopDetector } from '@/runners/task-runner-process-restart-loop-detector'; import { MissingAuthTokenError } from './errors/missing-auth-token.error'; import { TaskRunnerWsServer } from './runner-ws-server'; @@ -25,7 +29,14 @@ export class TaskRunnerModule { private taskRunnerProcess: TaskRunnerProcess | undefined; - constructor(private readonly runnerConfig: TaskRunnersConfig) {} + private taskRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined; + + constructor( + private readonly logger: Logger, + private readonly runnerConfig: TaskRunnersConfig, + ) { + this.logger = this.logger.scoped('task-runner'); + } async start() { a.ok(this.runnerConfig.enabled, 'Task runner is disabled'); @@ -83,6 +94,14 @@ export class TaskRunnerModule { const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); this.taskRunnerProcess = Container.get(TaskRunnerProcess); + this.taskRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector( + this.taskRunnerProcess, + ); + this.taskRunnerProcessRestartLoopDetector.on( + 'restart-loop-detected', + this.onRunnerRestartLoopDetected, + ); + await this.taskRunnerProcess.start(); const { InternalTaskRunnerDisconnectAnalyzer } = await import( @@ -92,4 +111,13 @@ export class TaskRunnerModule { Container.get(InternalTaskRunnerDisconnectAnalyzer), ); } + + private onRunnerRestartLoopDetected = async (error: TaskRunnerRestartLoopError) => { + this.logger.error(error.message); + ErrorReporterProxy.error(error); + + // Allow some time for the error to be flushed + await sleep(1000); + process.exit(1); + }; } diff --git a/packages/cli/src/runners/task-runner-process-restart-loop-detector.ts b/packages/cli/src/runners/task-runner-process-restart-loop-detector.ts new file mode 100644 index 0000000000..5431cde195 --- /dev/null +++ b/packages/cli/src/runners/task-runner-process-restart-loop-detector.ts @@ -0,0 +1,73 @@ +import { Time } from '@/constants'; +import { TaskRunnerRestartLoopError } from '@/runners/errors/task-runner-restart-loop-error'; +import type { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { TypedEmitter } from '@/typed-emitter'; + +const MAX_RESTARTS = 5; +const RESTARTS_WINDOW = 2 * Time.seconds.toMilliseconds; + +type TaskRunnerProcessRestartLoopDetectorEventMap = { + 'restart-loop-detected': TaskRunnerRestartLoopError; +}; + +/** + * A class to monitor the task runner process for restart loops + */ +export class TaskRunnerProcessRestartLoopDetector extends TypedEmitter { + /** + * How many times the process needs to restart for it to be detected + * being in a loop. + */ + private readonly maxCount = MAX_RESTARTS; + + /** + * The time interval in which the process needs to restart `maxCount` times + * to be detected as being in a loop. + */ + private readonly restartsWindow = RESTARTS_WINDOW; + + private numRestarts = 0; + + /** Time when the first restart of a loop happened within a time window */ + private firstRestartedAt = Date.now(); + + constructor(private readonly taskRunnerProcess: TaskRunnerProcess) { + super(); + + this.taskRunnerProcess.on('exit', () => { + this.increment(); + + if (this.isMaxCountExceeded()) { + this.emit( + 'restart-loop-detected', + new TaskRunnerRestartLoopError(this.numRestarts, this.msSinceFirstIncrement()), + ); + } + }); + } + + /** + * Increments the counter + */ + private increment() { + const now = Date.now(); + if (now > this.firstRestartedAt + this.restartsWindow) { + this.reset(); + } + + this.numRestarts++; + } + + private reset() { + this.numRestarts = 0; + this.firstRestartedAt = Date.now(); + } + + private isMaxCountExceeded() { + return this.numRestarts >= this.maxCount; + } + + private msSinceFirstIncrement() { + return Date.now() - this.firstRestartedAt; + } +} diff --git a/packages/cli/test/integration/runners/task-runner-module.external.test.ts b/packages/cli/test/integration/runners/task-runner-module.external.test.ts index bdabdf56ae..8e872c25ec 100644 --- a/packages/cli/test/integration/runners/task-runner-module.external.test.ts +++ b/packages/cli/test/integration/runners/task-runner-module.external.test.ts @@ -1,4 +1,5 @@ import { TaskRunnersConfig } from '@n8n/config'; +import { mock } from 'jest-mock-extended'; import Container from 'typedi'; import { MissingAuthTokenError } from '@/runners/errors/missing-auth-token.error'; @@ -32,7 +33,7 @@ describe('TaskRunnerModule in external mode', () => { runnerConfig.enabled = true; runnerConfig.authToken = ''; - const module = new TaskRunnerModule(runnerConfig); + const module = new TaskRunnerModule(mock(), runnerConfig); await expect(module.start()).rejects.toThrowError(MissingAuthTokenError); }); diff --git a/packages/cli/test/integration/runners/task-runner-process.test.ts b/packages/cli/test/integration/runners/task-runner-process.test.ts index 3b6c8aec8c..b21ef68640 100644 --- a/packages/cli/test/integration/runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/runners/task-runner-process.test.ts @@ -3,6 +3,7 @@ import Container from 'typedi'; import { TaskRunnerWsServer } from '@/runners/runner-ws-server'; import { TaskBroker } from '@/runners/task-broker.service'; import { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { TaskRunnerProcessRestartLoopDetector } from '@/runners/task-runner-process-restart-loop-detector'; import { retryUntil } from '@test-integration/retry-until'; import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; @@ -84,4 +85,33 @@ describe('TaskRunnerProcess', () => { expect(getNumRegisteredRunners()).toBe(1); expect(runnerProcess.pid).not.toBe(processId); }); + + it('should work together with restart loop detector', async () => { + // Arrange + const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(runnerProcess); + let restartLoopDetectedEventEmitted = false; + restartLoopDetector.once('restart-loop-detected', () => { + restartLoopDetectedEventEmitted = true; + }); + + // Act + await runnerProcess.start(); + + // Simulate a restart loop + for (let i = 0; i < 5; i++) { + await retryUntil(() => { + expect(runnerProcess.pid).toBeDefined(); + }); + + // @ts-expect-error private property + runnerProcess.process?.kill(); + + await new Promise((resolve) => { + runnerProcess.once('exit', resolve); + }); + } + + // Assert + expect(restartLoopDetectedEventEmitted).toBe(true); + }); });