From 04889864a09aba910998c95e0e47b7be4285c799 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Sep 2025 12:33:25 +0200 Subject: [PATCH] feat(core): Add internal mode for native Python runner (no-changelog) (#19288) --- .../@n8n/config/src/configs/logging.config.ts | 5 +- .../@n8n/task-runner-python/src/constants.py | 2 +- ...nner-process-restart-loop-detector.test.ts | 4 +- .../__tests__/task-runner-process.test.ts | 14 +- .../errors/missing-requirements.error.ts | 19 ++ ...nternal-task-runner-disconnect-analyzer.ts | 6 +- .../src/task-runners/task-runner-module.ts | 57 +++-- .../task-runners/task-runner-process-base.ts | 123 +++++++++++ .../task-runners/task-runner-process-js.ts | 89 ++++++++ .../task-runners/task-runner-process-py.ts | 68 ++++++ ...sk-runner-process-restart-loop-detector.ts | 5 +- .../src/task-runners/task-runner-process.ts | 201 ------------------ .../integration/commands/worker.cmd.test.ts | 4 +- .../task-runners/task-runner-process.test.ts | 4 +- 14 files changed, 363 insertions(+), 238 deletions(-) create mode 100644 packages/cli/src/task-runners/errors/missing-requirements.error.ts create mode 100644 packages/cli/src/task-runners/task-runner-process-base.ts create mode 100644 packages/cli/src/task-runners/task-runner-process-js.ts create mode 100644 packages/cli/src/task-runners/task-runner-process-py.ts delete mode 100644 packages/cli/src/task-runners/task-runner-process.ts diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index a68675c153..9852c842be 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -16,6 +16,8 @@ export const LOG_SCOPES = [ 'scaling', 'waiting-executions', 'task-runner', + 'task-runner-js', + 'task-runner-py', 'insights', 'workflow-activation', 'ssh-client', @@ -112,7 +114,8 @@ export class LoggingConfig { * - `redis` * - `scaling` * - `waiting-executions` - * - `task-runner` + * - `task-runner-js` + * - `task-runner-py` * - `workflow-activation` * - `insights` * diff --git a/packages/@n8n/task-runner-python/src/constants.py b/packages/@n8n/task-runner-python/src/constants.py index 437c754214..2464f12b42 100644 --- a/packages/@n8n/task-runner-python/src/constants.py +++ b/packages/@n8n/task-runner-python/src/constants.py @@ -19,7 +19,7 @@ RUNNER_NAME = "Python Task Runner" DEFAULT_MAX_CONCURRENCY = 5 # tasks DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 * 1024 # 1 GiB DEFAULT_TASK_TIMEOUT = 60 # seconds -DEFAULT_AUTO_SHUTDOWN_TIMEOUT = 15 # seconds +DEFAULT_AUTO_SHUTDOWN_TIMEOUT = 0 # seconds DEFAULT_SHUTDOWN_TIMEOUT = 10 # seconds OFFER_INTERVAL = 0.25 # 250ms OFFER_VALIDITY = 5000 # ms diff --git a/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts b/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts index 6b4b906f36..227c37dc42 100644 --- a/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts +++ b/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts @@ -5,14 +5,14 @@ import { mock } from 'jest-mock-extended'; import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service'; import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events'; -import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; +import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; describe('TaskRunnerProcessRestartLoopDetector', () => { const mockLogger = mock(); const mockAuthService = mock(); const runnerConfig = new TaskRunnersConfig(); - const taskRunnerProcess = new TaskRunnerProcess( + const taskRunnerProcess = new JsTaskRunnerProcess( mockLogger, runnerConfig, mockAuthService, diff --git a/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts b/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts index 45c204caff..86e17e515c 100644 --- a/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts @@ -5,7 +5,7 @@ import { mock } from 'jest-mock-extended'; import type { ChildProcess, SpawnOptions } from 'node:child_process'; import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service'; -import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; +import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js'; import type { TaskRunnerLifecycleEvents } from '../task-runner-lifecycle-events'; @@ -28,7 +28,7 @@ describe('TaskRunnerProcess', () => { runnerConfig.mode = 'internal'; runnerConfig.insecureMode = false; const authService = mock(); - let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); + let taskRunnerProcess = new JsTaskRunnerProcess(logger, runnerConfig, authService, mock()); afterEach(async () => { spawnMock.mockClear(); @@ -38,14 +38,14 @@ describe('TaskRunnerProcess', () => { it('should throw if runner mode is external', () => { runnerConfig.mode = 'external'; - expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow(); + expect(() => new JsTaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow(); runnerConfig.mode = 'internal'; }); it('should register listener for `runner:failed-heartbeat-check` event', () => { const runnerLifecycleEvents = mock(); - new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); + new JsTaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); expect(runnerLifecycleEvents.on).toHaveBeenCalledWith( 'runner:failed-heartbeat-check', @@ -55,7 +55,7 @@ describe('TaskRunnerProcess', () => { it('should register listener for `runner:timed-out-during-task` event', () => { const runnerLifecycleEvents = mock(); - new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); + new JsTaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); expect(runnerLifecycleEvents.on).toHaveBeenCalledWith( 'runner:timed-out-during-task', @@ -66,7 +66,7 @@ describe('TaskRunnerProcess', () => { describe('start', () => { beforeEach(() => { - taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); + taskRunnerProcess = new JsTaskRunnerProcess(logger, runnerConfig, authService, mock()); }); test.each([ @@ -166,7 +166,7 @@ describe('TaskRunnerProcess', () => { it('on insecure mode, should not use --disallow-code-generation-from-strings and --disable-proto=delete flags', async () => { jest.spyOn(authService, 'createGrantToken').mockResolvedValue('grantToken'); runnerConfig.insecureMode = true; - const insecureTaskRunnerProcess = new TaskRunnerProcess( + const insecureTaskRunnerProcess = new JsTaskRunnerProcess( logger, runnerConfig, authService, diff --git a/packages/cli/src/task-runners/errors/missing-requirements.error.ts b/packages/cli/src/task-runners/errors/missing-requirements.error.ts new file mode 100644 index 0000000000..eeb5fdb37c --- /dev/null +++ b/packages/cli/src/task-runners/errors/missing-requirements.error.ts @@ -0,0 +1,19 @@ +import { UserError } from 'n8n-workflow'; + +const ERROR_MESSAGE = 'Failed to start Python task runner in internal mode.'; + +type ReasonId = 'python' | 'venv'; + +const HINT = + 'Launching a Python runner in internal mode is intended only for debugging and is not recommended for production. Users are encouraged to deploy in external mode. See: https://docs.n8n.io/hosting/configuration/task-runners/#setting-up-external-mode'; + +export class MissingRequirementsError extends UserError { + constructor(reasonId: ReasonId) { + const reason = { + python: 'because Python 3 is missing from this system.', + venv: 'because its virtual environment is missing from this system.', + }[reasonId]; + + super([ERROR_MESSAGE, reason, HINT].join(' ')); + } +} diff --git a/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts b/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts index 386cab23fd..531ada01d8 100644 --- a/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts @@ -6,8 +6,8 @@ import type { DisconnectErrorOptions } from '@/task-runners/task-broker/task-bro import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { TaskRunnerOomError } from './errors/task-runner-oom-error'; import { SlidingWindowSignal } from './sliding-window-signal'; -import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process'; -import { TaskRunnerProcess } from './task-runner-process'; +import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process-base'; +import { JsTaskRunnerProcess } from './task-runner-process-js'; /** * Analyzes the disconnect reason of a task runner process to provide a more @@ -19,7 +19,7 @@ export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisco constructor( private readonly runnerConfig: TaskRunnersConfig, - private readonly taskRunnerProcess: TaskRunnerProcess, + private readonly taskRunnerProcess: JsTaskRunnerProcess, ) { super(); diff --git a/packages/cli/src/task-runners/task-runner-module.ts b/packages/cli/src/task-runners/task-runner-module.ts index d01c3c9515..0e5f67256e 100644 --- a/packages/cli/src/task-runners/task-runner-module.ts +++ b/packages/cli/src/task-runners/task-runner-module.ts @@ -8,7 +8,8 @@ import * as a from 'node:assert/strict'; import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; -import type { TaskRunnerProcess } from '@/task-runners/task-runner-process'; +import type { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js'; +import type { PyTaskRunnerProcess } from '@/task-runners/task-runner-process-py'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; import { MissingAuthTokenError } from './errors/missing-auth-token.error'; @@ -28,9 +29,13 @@ export class TaskRunnerModule { private taskRequester: LocalTaskRequester | undefined; - private taskRunnerProcess: TaskRunnerProcess | undefined; + private jsRunnerProcess: JsTaskRunnerProcess | undefined; - private taskRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined; + private pyRunnerProcess: PyTaskRunnerProcess | undefined; + + private jsRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined; + + private pyRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined; constructor( private readonly logger: Logger, @@ -50,17 +55,22 @@ export class TaskRunnerModule { await this.loadTaskRequester(); await this.loadTaskBroker(); - if (mode === 'internal') { - await this.startInternalTaskRunner(); - } + if (mode === 'internal') await this.startInternalTaskRunners(); } @OnShutdown() async stop() { const stopRunnerProcessTask = (async () => { - if (this.taskRunnerProcess) { - await this.taskRunnerProcess.stop(); - this.taskRunnerProcess = undefined; + if (this.jsRunnerProcess) { + await this.jsRunnerProcess.stop(); + this.jsRunnerProcess = undefined; + } + })(); + + const stopPythonRunnerProcessTask = (async () => { + if (this.pyRunnerProcess) { + await this.pyRunnerProcess.stop(); + this.pyRunnerProcess = undefined; } })(); @@ -71,7 +81,7 @@ export class TaskRunnerModule { } })(); - await Promise.all([stopRunnerProcessTask, stopRunnerServerTask]); + await Promise.all([stopRunnerProcessTask, stopPythonRunnerProcessTask, stopRunnerServerTask]); } private async loadTaskRequester() { @@ -93,20 +103,33 @@ export class TaskRunnerModule { await this.taskBrokerHttpServer.start(); } - private async startInternalTaskRunner() { + private async startInternalTaskRunners() { a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded'); - const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process'); - this.taskRunnerProcess = Container.get(TaskRunnerProcess); - this.taskRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector( - this.taskRunnerProcess, + const { JsTaskRunnerProcess } = await import('@/task-runners/task-runner-process-js'); + this.jsRunnerProcess = Container.get(JsTaskRunnerProcess); + this.jsRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector( + this.jsRunnerProcess, ); - this.taskRunnerProcessRestartLoopDetector.on( + this.jsRunnerProcessRestartLoopDetector.on( 'restart-loop-detected', this.onRunnerRestartLoopDetected, ); - await this.taskRunnerProcess.start(); + await this.jsRunnerProcess.start(); + + if (process.env.N8N_NATIVE_PYTHON_RUNNER === 'true') { + const { PyTaskRunnerProcess } = await import('@/task-runners/task-runner-process-py'); + this.pyRunnerProcess = Container.get(PyTaskRunnerProcess); + this.pyRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector( + this.pyRunnerProcess, + ); + this.pyRunnerProcessRestartLoopDetector.on( + 'restart-loop-detected', + this.onRunnerRestartLoopDetected, + ); + await this.pyRunnerProcess.start(); + } const { InternalTaskRunnerDisconnectAnalyzer } = await import( '@/task-runners/internal-task-runner-disconnect-analyzer' diff --git a/packages/cli/src/task-runners/task-runner-process-base.ts b/packages/cli/src/task-runners/task-runner-process-base.ts new file mode 100644 index 0000000000..94fd212fb9 --- /dev/null +++ b/packages/cli/src/task-runners/task-runner-process-base.ts @@ -0,0 +1,123 @@ +import { Logger } from '@n8n/backend-common'; +import { LogScope, TaskRunnersConfig } from '@n8n/config'; +import { OnShutdown } from '@n8n/decorators'; +import { Service } from '@n8n/di'; +import assert from 'node:assert/strict'; +import { spawn } from 'node:child_process'; + +import { TypedEmitter } from '@/typed-emitter'; + +import { forwardToLogger } from './forward-to-logger'; +import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service'; +import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; + +export type ChildProcess = ReturnType; + +export type ExitReason = 'unknown' | 'oom'; + +export type TaskRunnerProcessEventMap = { + exit: { + reason: ExitReason; + }; +}; + +@Service() +export abstract class TaskRunnerProcessBase extends TypedEmitter { + protected readonly name: string; + + protected readonly loggerScope: LogScope; + + protected process: ChildProcess | null = null; + + protected _runPromise: Promise | null = null; + + protected isShuttingDown = false; + + constructor( + protected readonly logger: Logger, + protected readonly runnerConfig: TaskRunnersConfig, + protected readonly authService: TaskBrokerAuthService, + protected readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, + ) { + super(); + this.logger = logger.scoped(this.loggerScope); + + this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => { + this.logger.warn('Task runner failed heartbeat check, restarting...'); + void this.forceRestart(); + }); + + this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => { + this.logger.warn('Task runner timed out during task, restarting...'); + void this.forceRestart(); + }); + } + + get isRunning() { + return this.process !== null; + } + + get pid() { + return this.process?.pid; + } + + get runPromise() { + return this._runPromise; + } + + get isInternal() { + return this.runnerConfig.mode === 'internal'; + } + + async start() { + assert(!this.process, `${this.name} already running`); + + const grantToken = await this.authService.createGrantToken(); + const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`; + + this.process = await this.startProcess(grantToken, taskBrokerUri); + forwardToLogger(this.logger, this.process, `[${this.name}] `); + this.monitorProcess(this.process); + } + + @OnShutdown() + async stop() { + if (!this.process) return; + + this.isShuttingDown = true; + this.process.kill(); + await this._runPromise; + this.isShuttingDown = false; + } + + /** Force-restart a task runner process suspected of being unresponsive. */ + protected async forceRestart() { + if (!this.process) return; + + this.process.kill('SIGKILL'); + await this._runPromise; + } + + protected onProcessExit(code: number | null, resolveFn: () => void) { + this.process = null; + const exitReason = this.analyzeExitReason?.(code) ?? { reason: 'unknown' }; + this.emit('exit', exitReason); + resolveFn(); + if (!this.isShuttingDown) { + setImmediate(async () => await this.start()); + } + } + + protected monitorProcess(taskRunnerProcess: ChildProcess) { + this._runPromise = new Promise((resolve) => { + this.setupProcessMonitoring?.(taskRunnerProcess); + taskRunnerProcess.on('exit', (code) => { + this.onProcessExit(code, resolve); + }); + }); + } + + abstract startProcess(grantToken: string, taskBrokerUri: string): Promise; + setupProcessMonitoring?(process: ChildProcess): void; + analyzeExitReason?(code: number | null): { reason: ExitReason }; +} diff --git a/packages/cli/src/task-runners/task-runner-process-js.ts b/packages/cli/src/task-runners/task-runner-process-js.ts new file mode 100644 index 0000000000..5eb2862b07 --- /dev/null +++ b/packages/cli/src/task-runners/task-runner-process-js.ts @@ -0,0 +1,89 @@ +import { Logger } from '@n8n/backend-common'; +import { TaskRunnersConfig } from '@n8n/config'; +import { Service } from '@n8n/di'; +import assert from 'node:assert/strict'; +import { spawn } from 'node:child_process'; +import * as process from 'node:process'; + +import { NodeProcessOomDetector } from './node-process-oom-detector'; +import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service'; +import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; +import { ChildProcess, ExitReason, TaskRunnerProcessBase } from './task-runner-process-base'; + +/** + * Responsible for managing a JavaScript task runner as a child process. + * This is for internal mode, which is NOT recommended for production. + */ +@Service() +export class JsTaskRunnerProcess extends TaskRunnerProcessBase { + readonly name = 'runnner:js'; + + readonly loggerScope = 'task-runner-js'; + + private oomDetector: NodeProcessOomDetector | null = null; + + constructor( + readonly logger: Logger, + readonly runnerConfig: TaskRunnersConfig, + readonly authService: TaskBrokerAuthService, + readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, + ) { + super(logger, runnerConfig, authService, runnerLifecycleEvents); + + assert(this.isInternal, `${this.constructor.name} cannot be used in external mode`); + } + + async startProcess(grantToken: string, taskBrokerUri: string): Promise { + const startScript = require.resolve('@n8n/task-runner/start'); + const flags = this.runnerConfig.insecureMode + ? [] + : ['--disallow-code-generation-from-strings', '--disable-proto=delete']; + + return spawn('node', [...flags, startScript], { + env: this.getProcessEnvVars(grantToken, taskBrokerUri), + }); + } + + setupProcessMonitoring(process: ChildProcess) { + this.oomDetector = new NodeProcessOomDetector(process); + } + + analyzeExitReason(): { reason: ExitReason } { + return { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' }; + } + + private getProcessEnvVars(grantToken: string, taskBrokerUri: string) { + const envVars: Record = { + // system environment + PATH: process.env.PATH, + HOME: process.env.HOME, + NODE_PATH: process.env.NODE_PATH, + + // n8n + GENERIC_TIMEZONE: process.env.GENERIC_TIMEZONE, + NODE_FUNCTION_ALLOW_BUILTIN: process.env.NODE_FUNCTION_ALLOW_BUILTIN, + NODE_FUNCTION_ALLOW_EXTERNAL: process.env.NODE_FUNCTION_ALLOW_EXTERNAL, + + // sentry + N8N_SENTRY_DSN: process.env.N8N_SENTRY_DSN, + N8N_VERSION: process.env.N8N_VERSION, + ENVIRONMENT: process.env.ENVIRONMENT, + DEPLOYMENT_NAME: process.env.DEPLOYMENT_NAME, + + // runner + N8N_RUNNERS_GRANT_TOKEN: grantToken, + N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri, + N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(), + N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(), + N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(), + N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(), + N8N_RUNNERS_INSECURE_MODE: process.env.N8N_RUNNERS_INSECURE_MODE, + }; + + if (this.runnerConfig.maxOldSpaceSize) { + envVars.NODE_OPTIONS = `--max-old-space-size=${this.runnerConfig.maxOldSpaceSize}`; + } + + return envVars; + } +} diff --git a/packages/cli/src/task-runners/task-runner-process-py.ts b/packages/cli/src/task-runners/task-runner-process-py.ts new file mode 100644 index 0000000000..a7ae3ff5cd --- /dev/null +++ b/packages/cli/src/task-runners/task-runner-process-py.ts @@ -0,0 +1,68 @@ +import { Logger } from '@n8n/backend-common'; +import { TaskRunnersConfig } from '@n8n/config'; +import { Service } from '@n8n/di'; +import { exec, spawn } from 'node:child_process'; +import { access } from 'node:fs/promises'; +import path from 'node:path'; +import { promisify } from 'node:util'; + +import { MissingRequirementsError } from './errors/missing-requirements.error'; +import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service'; +import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; +import { TaskRunnerProcessBase } from './task-runner-process-base'; + +const asyncExec = promisify(exec); + +/** + * Responsible for managing a Python task runner as a child process. + * This is for internal mode, which is NOT recommended for production. + */ +@Service() +export class PyTaskRunnerProcess extends TaskRunnerProcessBase { + protected readonly name = 'runner:py'; + + protected readonly loggerScope = 'task-runner-py'; + + constructor( + readonly logger: Logger, + readonly runnerConfig: TaskRunnersConfig, + readonly authService: TaskBrokerAuthService, + readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, + ) { + super(logger, runnerConfig, authService, runnerLifecycleEvents); + } + + async startProcess(grantToken: string, taskBrokerUri: string) { + try { + await asyncExec('python3 --version', { timeout: 5000 }); + } catch { + throw new MissingRequirementsError('python'); + } + + const pythonDir = path.join(__dirname, '../../../@n8n/task-runner-python'); + const venvPath = path.join(pythonDir, '.venv/bin/python'); + + try { + await access(venvPath); + } catch { + throw new MissingRequirementsError('venv'); + } + + return spawn(venvPath, ['-m', 'src.main'], { + cwd: pythonDir, + env: { + // system environment + PATH: process.env.PATH, + HOME: process.env.HOME, + + // runner + N8N_RUNNERS_GRANT_TOKEN: grantToken, + N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri, + N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(), + N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(), + N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(), + N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(), + }, + }); + } +} diff --git a/packages/cli/src/task-runners/task-runner-process-restart-loop-detector.ts b/packages/cli/src/task-runners/task-runner-process-restart-loop-detector.ts index 23a0ba79b5..9ed70fbd9c 100644 --- a/packages/cli/src/task-runners/task-runner-process-restart-loop-detector.ts +++ b/packages/cli/src/task-runners/task-runner-process-restart-loop-detector.ts @@ -1,9 +1,10 @@ import { Time } from '@n8n/constants'; import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; -import type { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TypedEmitter } from '@/typed-emitter'; +import type { TaskRunnerProcessBase } from './task-runner-process-base'; + const MAX_RESTARTS = 5; const RESTARTS_WINDOW = 2 * Time.seconds.toMilliseconds; @@ -32,7 +33,7 @@ export class TaskRunnerProcessRestartLoopDetector extends TypedEmitter { diff --git a/packages/cli/src/task-runners/task-runner-process.ts b/packages/cli/src/task-runners/task-runner-process.ts deleted file mode 100644 index 86ebe3c923..0000000000 --- a/packages/cli/src/task-runners/task-runner-process.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { Logger } from '@n8n/backend-common'; -import { TaskRunnersConfig } from '@n8n/config'; -import { OnShutdown } from '@n8n/decorators'; -import { Service } from '@n8n/di'; -import * as a from 'node:assert/strict'; -import { spawn } from 'node:child_process'; -import * as process from 'node:process'; - -import { forwardToLogger } from './forward-to-logger'; -import { NodeProcessOomDetector } from './node-process-oom-detector'; -import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service'; -import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; -import { TypedEmitter } from '../typed-emitter'; - -type ChildProcess = ReturnType; - -export type ExitReason = 'unknown' | 'oom'; - -export type TaskRunnerProcessEventMap = { - exit: { - reason: ExitReason; - }; -}; - -/** - * Manages the JS task runner process as a child process - */ -@Service() -export class TaskRunnerProcess extends TypedEmitter { - get isRunning() { - return this.process !== null; - } - - /** The process ID of the task runner process */ - get pid() { - return this.process?.pid; - } - - /** Promise that resolves when the process has exited */ - get runPromise() { - return this._runPromise; - } - - private process: ChildProcess | null = null; - - private _runPromise: Promise | null = null; - - private oomDetector: NodeProcessOomDetector | null = null; - - private isShuttingDown = false; - - private logger: Logger; - - /** Environment variables inherited by the child process from the current environment. */ - private readonly passthroughEnvVars = [ - 'PATH', - 'HOME', // So home directory can be resolved correctly - 'GENERIC_TIMEZONE', - 'NODE_FUNCTION_ALLOW_BUILTIN', - 'NODE_FUNCTION_ALLOW_EXTERNAL', - 'N8N_SENTRY_DSN', - 'N8N_RUNNERS_INSECURE_MODE', - // Metadata about the environment - 'N8N_VERSION', - 'ENVIRONMENT', - 'DEPLOYMENT_NAME', - 'NODE_PATH', - ] as const; - - private readonly mode: 'insecure' | 'secure' = 'secure'; - - constructor( - logger: Logger, - private readonly runnerConfig: TaskRunnersConfig, - private readonly authService: TaskBrokerAuthService, - private readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, - ) { - super(); - - a.ok( - this.runnerConfig.mode !== 'external', - 'Task Runner Process cannot be used in external mode', - ); - - this.mode = this.runnerConfig.insecureMode ? 'insecure' : 'secure'; - - this.logger = logger.scoped('task-runner'); - - this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => { - this.logger.warn('Task runner failed heartbeat check, restarting...'); - void this.forceRestart(); - }); - - this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => { - this.logger.warn('Task runner timed out during task, restarting...'); - void this.forceRestart(); - }); - } - - async start() { - a.ok(!this.process, 'Task Runner Process already running'); - - const grantToken = await this.authService.createGrantToken(); - - const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`; - this.process = this.startNode(grantToken, taskBrokerUri); - - forwardToLogger(this.logger, this.process, '[Task Runner]: '); - - this.monitorProcess(this.process); - } - - startNode(grantToken: string, taskBrokerUri: string) { - const startScript = require.resolve('@n8n/task-runner/start'); - - const flags = - this.mode === 'secure' - ? ['--disallow-code-generation-from-strings', '--disable-proto=delete'] - : []; - - return spawn('node', [...flags, startScript], { - env: this.getProcessEnvVars(grantToken, taskBrokerUri), - }); - } - - @OnShutdown() - async stop() { - if (!this.process) return; - - this.isShuttingDown = true; - - // TODO: Timeout & force kill - this.killNode(); - await this._runPromise; - - this.isShuttingDown = false; - } - - /** Force-restart a runner suspected of being unresponsive. */ - async forceRestart() { - if (!this.process) return; - - this.process.kill('SIGKILL'); - - await this._runPromise; - } - - killNode() { - if (!this.process) return; - - this.process.kill(); - } - - private monitorProcess(taskRunnerProcess: ChildProcess) { - this._runPromise = new Promise((resolve) => { - this.oomDetector = new NodeProcessOomDetector(taskRunnerProcess); - - taskRunnerProcess.on('exit', (code) => { - this.onProcessExit(code, resolve); - }); - }); - } - - private onProcessExit(_code: number | null, resolveFn: () => void) { - this.process = null; - this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' }); - resolveFn(); - - if (!this.isShuttingDown) { - setImmediate(async () => await this.start()); - } - } - - private getProcessEnvVars(grantToken: string, taskBrokerUri: string) { - const envVars: Record = { - N8N_RUNNERS_GRANT_TOKEN: grantToken, - N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri, - N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(), - N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(), - N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(), - N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(), - ...this.getPassthroughEnvVars(), - }; - - if (this.runnerConfig.maxOldSpaceSize) { - envVars.NODE_OPTIONS = `--max-old-space-size=${this.runnerConfig.maxOldSpaceSize}`; - } - - return envVars; - } - - private getPassthroughEnvVars() { - return this.passthroughEnvVars.reduce>((env, key) => { - if (process.env[key]) { - env[key] = process.env[key]; - } - - return env; - }, {}); - } -} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 6a1c1e3175..d4b1cba89f 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -18,7 +18,7 @@ import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server'; -import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; +import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js'; import { Telemetry } from '@/telemetry'; import { setupTestCommand } from '@test-integration/utils/test-command'; @@ -34,7 +34,7 @@ const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const scalingService = mockInstance(ScalingService); const taskBrokerServer = mockInstance(TaskBrokerServer); -const taskRunnerProcess = mockInstance(TaskRunnerProcess); +const taskRunnerProcess = mockInstance(JsTaskRunnerProcess); mockInstance(Publisher); mockInstance(Subscriber); mockInstance(Telemetry); diff --git a/packages/cli/test/integration/task-runners/task-runner-process.test.ts b/packages/cli/test/integration/task-runners/task-runner-process.test.ts index 6e5100e24a..38508ff9a8 100644 --- a/packages/cli/test/integration/task-runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/task-runners/task-runner-process.test.ts @@ -2,7 +2,7 @@ import { Container } from '@n8n/di'; import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; import { TaskBroker } from '@/task-runners/task-broker/task-broker.service'; -import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; +import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; import { retryUntil } from '@test-integration/retry-until'; import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; @@ -11,7 +11,7 @@ describe('TaskRunnerProcess', () => { const { config, server: taskRunnerServer } = setupBrokerTestServer({ mode: 'internal', }); - const runnerProcess = Container.get(TaskRunnerProcess); + const runnerProcess = Container.get(JsTaskRunnerProcess); const taskBroker = Container.get(TaskBroker); const taskRunnerService = Container.get(TaskBrokerWsServer);