mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
feat(core): Add internal mode for native Python runner (no-changelog) (#19288)
This commit is contained in:
@@ -16,6 +16,8 @@ export const LOG_SCOPES = [
|
|||||||
'scaling',
|
'scaling',
|
||||||
'waiting-executions',
|
'waiting-executions',
|
||||||
'task-runner',
|
'task-runner',
|
||||||
|
'task-runner-js',
|
||||||
|
'task-runner-py',
|
||||||
'insights',
|
'insights',
|
||||||
'workflow-activation',
|
'workflow-activation',
|
||||||
'ssh-client',
|
'ssh-client',
|
||||||
@@ -112,7 +114,8 @@ export class LoggingConfig {
|
|||||||
* - `redis`
|
* - `redis`
|
||||||
* - `scaling`
|
* - `scaling`
|
||||||
* - `waiting-executions`
|
* - `waiting-executions`
|
||||||
* - `task-runner`
|
* - `task-runner-js`
|
||||||
|
* - `task-runner-py`
|
||||||
* - `workflow-activation`
|
* - `workflow-activation`
|
||||||
* - `insights`
|
* - `insights`
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ RUNNER_NAME = "Python Task Runner"
|
|||||||
DEFAULT_MAX_CONCURRENCY = 5 # tasks
|
DEFAULT_MAX_CONCURRENCY = 5 # tasks
|
||||||
DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 * 1024 # 1 GiB
|
DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 * 1024 # 1 GiB
|
||||||
DEFAULT_TASK_TIMEOUT = 60 # seconds
|
DEFAULT_TASK_TIMEOUT = 60 # seconds
|
||||||
DEFAULT_AUTO_SHUTDOWN_TIMEOUT = 15 # seconds
|
DEFAULT_AUTO_SHUTDOWN_TIMEOUT = 0 # seconds
|
||||||
DEFAULT_SHUTDOWN_TIMEOUT = 10 # seconds
|
DEFAULT_SHUTDOWN_TIMEOUT = 10 # seconds
|
||||||
OFFER_INTERVAL = 0.25 # 250ms
|
OFFER_INTERVAL = 0.25 # 250ms
|
||||||
OFFER_VALIDITY = 5000 # ms
|
OFFER_VALIDITY = 5000 # ms
|
||||||
|
|||||||
@@ -5,14 +5,14 @@ import { mock } from 'jest-mock-extended';
|
|||||||
import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
|
import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
|
||||||
import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service';
|
import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service';
|
||||||
import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
|
import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
|
||||||
import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
|
import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js';
|
||||||
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
||||||
|
|
||||||
describe('TaskRunnerProcessRestartLoopDetector', () => {
|
describe('TaskRunnerProcessRestartLoopDetector', () => {
|
||||||
const mockLogger = mock<Logger>();
|
const mockLogger = mock<Logger>();
|
||||||
const mockAuthService = mock<TaskBrokerAuthService>();
|
const mockAuthService = mock<TaskBrokerAuthService>();
|
||||||
const runnerConfig = new TaskRunnersConfig();
|
const runnerConfig = new TaskRunnersConfig();
|
||||||
const taskRunnerProcess = new TaskRunnerProcess(
|
const taskRunnerProcess = new JsTaskRunnerProcess(
|
||||||
mockLogger,
|
mockLogger,
|
||||||
runnerConfig,
|
runnerConfig,
|
||||||
mockAuthService,
|
mockAuthService,
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import { mock } from 'jest-mock-extended';
|
|||||||
import type { ChildProcess, SpawnOptions } from 'node:child_process';
|
import type { ChildProcess, SpawnOptions } from 'node:child_process';
|
||||||
|
|
||||||
import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service';
|
import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service';
|
||||||
import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
|
import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js';
|
||||||
|
|
||||||
import type { TaskRunnerLifecycleEvents } from '../task-runner-lifecycle-events';
|
import type { TaskRunnerLifecycleEvents } from '../task-runner-lifecycle-events';
|
||||||
|
|
||||||
@@ -28,7 +28,7 @@ describe('TaskRunnerProcess', () => {
|
|||||||
runnerConfig.mode = 'internal';
|
runnerConfig.mode = 'internal';
|
||||||
runnerConfig.insecureMode = false;
|
runnerConfig.insecureMode = false;
|
||||||
const authService = mock<TaskBrokerAuthService>();
|
const authService = mock<TaskBrokerAuthService>();
|
||||||
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
|
let taskRunnerProcess = new JsTaskRunnerProcess(logger, runnerConfig, authService, mock());
|
||||||
|
|
||||||
afterEach(async () => {
|
afterEach(async () => {
|
||||||
spawnMock.mockClear();
|
spawnMock.mockClear();
|
||||||
@@ -38,14 +38,14 @@ describe('TaskRunnerProcess', () => {
|
|||||||
it('should throw if runner mode is external', () => {
|
it('should throw if runner mode is external', () => {
|
||||||
runnerConfig.mode = 'external';
|
runnerConfig.mode = 'external';
|
||||||
|
|
||||||
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow();
|
expect(() => new JsTaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow();
|
||||||
|
|
||||||
runnerConfig.mode = 'internal';
|
runnerConfig.mode = 'internal';
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should register listener for `runner:failed-heartbeat-check` event', () => {
|
it('should register listener for `runner:failed-heartbeat-check` event', () => {
|
||||||
const runnerLifecycleEvents = mock<TaskRunnerLifecycleEvents>();
|
const runnerLifecycleEvents = mock<TaskRunnerLifecycleEvents>();
|
||||||
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
|
new JsTaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
|
||||||
|
|
||||||
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
|
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
|
||||||
'runner:failed-heartbeat-check',
|
'runner:failed-heartbeat-check',
|
||||||
@@ -55,7 +55,7 @@ describe('TaskRunnerProcess', () => {
|
|||||||
|
|
||||||
it('should register listener for `runner:timed-out-during-task` event', () => {
|
it('should register listener for `runner:timed-out-during-task` event', () => {
|
||||||
const runnerLifecycleEvents = mock<TaskRunnerLifecycleEvents>();
|
const runnerLifecycleEvents = mock<TaskRunnerLifecycleEvents>();
|
||||||
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
|
new JsTaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
|
||||||
|
|
||||||
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
|
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
|
||||||
'runner:timed-out-during-task',
|
'runner:timed-out-during-task',
|
||||||
@@ -66,7 +66,7 @@ describe('TaskRunnerProcess', () => {
|
|||||||
|
|
||||||
describe('start', () => {
|
describe('start', () => {
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
|
taskRunnerProcess = new JsTaskRunnerProcess(logger, runnerConfig, authService, mock());
|
||||||
});
|
});
|
||||||
|
|
||||||
test.each([
|
test.each([
|
||||||
@@ -166,7 +166,7 @@ describe('TaskRunnerProcess', () => {
|
|||||||
it('on insecure mode, should not use --disallow-code-generation-from-strings and --disable-proto=delete flags', async () => {
|
it('on insecure mode, should not use --disallow-code-generation-from-strings and --disable-proto=delete flags', async () => {
|
||||||
jest.spyOn(authService, 'createGrantToken').mockResolvedValue('grantToken');
|
jest.spyOn(authService, 'createGrantToken').mockResolvedValue('grantToken');
|
||||||
runnerConfig.insecureMode = true;
|
runnerConfig.insecureMode = true;
|
||||||
const insecureTaskRunnerProcess = new TaskRunnerProcess(
|
const insecureTaskRunnerProcess = new JsTaskRunnerProcess(
|
||||||
logger,
|
logger,
|
||||||
runnerConfig,
|
runnerConfig,
|
||||||
authService,
|
authService,
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
import { UserError } from 'n8n-workflow';
|
||||||
|
|
||||||
|
const ERROR_MESSAGE = 'Failed to start Python task runner in internal mode.';
|
||||||
|
|
||||||
|
type ReasonId = 'python' | 'venv';
|
||||||
|
|
||||||
|
const HINT =
|
||||||
|
'Launching a Python runner in internal mode is intended only for debugging and is not recommended for production. Users are encouraged to deploy in external mode. See: https://docs.n8n.io/hosting/configuration/task-runners/#setting-up-external-mode';
|
||||||
|
|
||||||
|
export class MissingRequirementsError extends UserError {
|
||||||
|
constructor(reasonId: ReasonId) {
|
||||||
|
const reason = {
|
||||||
|
python: 'because Python 3 is missing from this system.',
|
||||||
|
venv: 'because its virtual environment is missing from this system.',
|
||||||
|
}[reasonId];
|
||||||
|
|
||||||
|
super([ERROR_MESSAGE, reason, HINT].join(' '));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,8 +6,8 @@ import type { DisconnectErrorOptions } from '@/task-runners/task-broker/task-bro
|
|||||||
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
|
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
|
||||||
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
|
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
|
||||||
import { SlidingWindowSignal } from './sliding-window-signal';
|
import { SlidingWindowSignal } from './sliding-window-signal';
|
||||||
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process';
|
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process-base';
|
||||||
import { TaskRunnerProcess } from './task-runner-process';
|
import { JsTaskRunnerProcess } from './task-runner-process-js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Analyzes the disconnect reason of a task runner process to provide a more
|
* Analyzes the disconnect reason of a task runner process to provide a more
|
||||||
@@ -19,7 +19,7 @@ export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisco
|
|||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly runnerConfig: TaskRunnersConfig,
|
private readonly runnerConfig: TaskRunnersConfig,
|
||||||
private readonly taskRunnerProcess: TaskRunnerProcess,
|
private readonly taskRunnerProcess: JsTaskRunnerProcess,
|
||||||
) {
|
) {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,8 @@ import * as a from 'node:assert/strict';
|
|||||||
|
|
||||||
import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
|
import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
|
||||||
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
|
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
|
||||||
import type { TaskRunnerProcess } from '@/task-runners/task-runner-process';
|
import type { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js';
|
||||||
|
import type { PyTaskRunnerProcess } from '@/task-runners/task-runner-process-py';
|
||||||
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
||||||
|
|
||||||
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
|
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
|
||||||
@@ -28,9 +29,13 @@ export class TaskRunnerModule {
|
|||||||
|
|
||||||
private taskRequester: LocalTaskRequester | undefined;
|
private taskRequester: LocalTaskRequester | undefined;
|
||||||
|
|
||||||
private taskRunnerProcess: TaskRunnerProcess | undefined;
|
private jsRunnerProcess: JsTaskRunnerProcess | undefined;
|
||||||
|
|
||||||
private taskRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined;
|
private pyRunnerProcess: PyTaskRunnerProcess | undefined;
|
||||||
|
|
||||||
|
private jsRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined;
|
||||||
|
|
||||||
|
private pyRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
@@ -50,17 +55,22 @@ export class TaskRunnerModule {
|
|||||||
await this.loadTaskRequester();
|
await this.loadTaskRequester();
|
||||||
await this.loadTaskBroker();
|
await this.loadTaskBroker();
|
||||||
|
|
||||||
if (mode === 'internal') {
|
if (mode === 'internal') await this.startInternalTaskRunners();
|
||||||
await this.startInternalTaskRunner();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnShutdown()
|
@OnShutdown()
|
||||||
async stop() {
|
async stop() {
|
||||||
const stopRunnerProcessTask = (async () => {
|
const stopRunnerProcessTask = (async () => {
|
||||||
if (this.taskRunnerProcess) {
|
if (this.jsRunnerProcess) {
|
||||||
await this.taskRunnerProcess.stop();
|
await this.jsRunnerProcess.stop();
|
||||||
this.taskRunnerProcess = undefined;
|
this.jsRunnerProcess = undefined;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
const stopPythonRunnerProcessTask = (async () => {
|
||||||
|
if (this.pyRunnerProcess) {
|
||||||
|
await this.pyRunnerProcess.stop();
|
||||||
|
this.pyRunnerProcess = undefined;
|
||||||
}
|
}
|
||||||
})();
|
})();
|
||||||
|
|
||||||
@@ -71,7 +81,7 @@ export class TaskRunnerModule {
|
|||||||
}
|
}
|
||||||
})();
|
})();
|
||||||
|
|
||||||
await Promise.all([stopRunnerProcessTask, stopRunnerServerTask]);
|
await Promise.all([stopRunnerProcessTask, stopPythonRunnerProcessTask, stopRunnerServerTask]);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async loadTaskRequester() {
|
private async loadTaskRequester() {
|
||||||
@@ -93,20 +103,33 @@ export class TaskRunnerModule {
|
|||||||
await this.taskBrokerHttpServer.start();
|
await this.taskBrokerHttpServer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async startInternalTaskRunner() {
|
private async startInternalTaskRunners() {
|
||||||
a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded');
|
a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded');
|
||||||
|
|
||||||
const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process');
|
const { JsTaskRunnerProcess } = await import('@/task-runners/task-runner-process-js');
|
||||||
this.taskRunnerProcess = Container.get(TaskRunnerProcess);
|
this.jsRunnerProcess = Container.get(JsTaskRunnerProcess);
|
||||||
this.taskRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector(
|
this.jsRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector(
|
||||||
this.taskRunnerProcess,
|
this.jsRunnerProcess,
|
||||||
);
|
);
|
||||||
this.taskRunnerProcessRestartLoopDetector.on(
|
this.jsRunnerProcessRestartLoopDetector.on(
|
||||||
'restart-loop-detected',
|
'restart-loop-detected',
|
||||||
this.onRunnerRestartLoopDetected,
|
this.onRunnerRestartLoopDetected,
|
||||||
);
|
);
|
||||||
|
|
||||||
await this.taskRunnerProcess.start();
|
await this.jsRunnerProcess.start();
|
||||||
|
|
||||||
|
if (process.env.N8N_NATIVE_PYTHON_RUNNER === 'true') {
|
||||||
|
const { PyTaskRunnerProcess } = await import('@/task-runners/task-runner-process-py');
|
||||||
|
this.pyRunnerProcess = Container.get(PyTaskRunnerProcess);
|
||||||
|
this.pyRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector(
|
||||||
|
this.pyRunnerProcess,
|
||||||
|
);
|
||||||
|
this.pyRunnerProcessRestartLoopDetector.on(
|
||||||
|
'restart-loop-detected',
|
||||||
|
this.onRunnerRestartLoopDetected,
|
||||||
|
);
|
||||||
|
await this.pyRunnerProcess.start();
|
||||||
|
}
|
||||||
|
|
||||||
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
|
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
|
||||||
'@/task-runners/internal-task-runner-disconnect-analyzer'
|
'@/task-runners/internal-task-runner-disconnect-analyzer'
|
||||||
|
|||||||
123
packages/cli/src/task-runners/task-runner-process-base.ts
Normal file
123
packages/cli/src/task-runners/task-runner-process-base.ts
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
import { Logger } from '@n8n/backend-common';
|
||||||
|
import { LogScope, TaskRunnersConfig } from '@n8n/config';
|
||||||
|
import { OnShutdown } from '@n8n/decorators';
|
||||||
|
import { Service } from '@n8n/di';
|
||||||
|
import assert from 'node:assert/strict';
|
||||||
|
import { spawn } from 'node:child_process';
|
||||||
|
|
||||||
|
import { TypedEmitter } from '@/typed-emitter';
|
||||||
|
|
||||||
|
import { forwardToLogger } from './forward-to-logger';
|
||||||
|
import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service';
|
||||||
|
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
|
||||||
|
|
||||||
|
export type ChildProcess = ReturnType<typeof spawn>;
|
||||||
|
|
||||||
|
export type ExitReason = 'unknown' | 'oom';
|
||||||
|
|
||||||
|
export type TaskRunnerProcessEventMap = {
|
||||||
|
exit: {
|
||||||
|
reason: ExitReason;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
@Service()
|
||||||
|
export abstract class TaskRunnerProcessBase extends TypedEmitter<TaskRunnerProcessEventMap> {
|
||||||
|
protected readonly name: string;
|
||||||
|
|
||||||
|
protected readonly loggerScope: LogScope;
|
||||||
|
|
||||||
|
protected process: ChildProcess | null = null;
|
||||||
|
|
||||||
|
protected _runPromise: Promise<void> | null = null;
|
||||||
|
|
||||||
|
protected isShuttingDown = false;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
protected readonly logger: Logger,
|
||||||
|
protected readonly runnerConfig: TaskRunnersConfig,
|
||||||
|
protected readonly authService: TaskBrokerAuthService,
|
||||||
|
protected readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents,
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
this.logger = logger.scoped(this.loggerScope);
|
||||||
|
|
||||||
|
this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => {
|
||||||
|
this.logger.warn('Task runner failed heartbeat check, restarting...');
|
||||||
|
void this.forceRestart();
|
||||||
|
});
|
||||||
|
|
||||||
|
this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => {
|
||||||
|
this.logger.warn('Task runner timed out during task, restarting...');
|
||||||
|
void this.forceRestart();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
get isRunning() {
|
||||||
|
return this.process !== null;
|
||||||
|
}
|
||||||
|
|
||||||
|
get pid() {
|
||||||
|
return this.process?.pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
get runPromise() {
|
||||||
|
return this._runPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
get isInternal() {
|
||||||
|
return this.runnerConfig.mode === 'internal';
|
||||||
|
}
|
||||||
|
|
||||||
|
async start() {
|
||||||
|
assert(!this.process, `${this.name} already running`);
|
||||||
|
|
||||||
|
const grantToken = await this.authService.createGrantToken();
|
||||||
|
const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`;
|
||||||
|
|
||||||
|
this.process = await this.startProcess(grantToken, taskBrokerUri);
|
||||||
|
forwardToLogger(this.logger, this.process, `[${this.name}] `);
|
||||||
|
this.monitorProcess(this.process);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnShutdown()
|
||||||
|
async stop() {
|
||||||
|
if (!this.process) return;
|
||||||
|
|
||||||
|
this.isShuttingDown = true;
|
||||||
|
this.process.kill();
|
||||||
|
await this._runPromise;
|
||||||
|
this.isShuttingDown = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Force-restart a task runner process suspected of being unresponsive. */
|
||||||
|
protected async forceRestart() {
|
||||||
|
if (!this.process) return;
|
||||||
|
|
||||||
|
this.process.kill('SIGKILL');
|
||||||
|
await this._runPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected onProcessExit(code: number | null, resolveFn: () => void) {
|
||||||
|
this.process = null;
|
||||||
|
const exitReason = this.analyzeExitReason?.(code) ?? { reason: 'unknown' };
|
||||||
|
this.emit('exit', exitReason);
|
||||||
|
resolveFn();
|
||||||
|
if (!this.isShuttingDown) {
|
||||||
|
setImmediate(async () => await this.start());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected monitorProcess(taskRunnerProcess: ChildProcess) {
|
||||||
|
this._runPromise = new Promise((resolve) => {
|
||||||
|
this.setupProcessMonitoring?.(taskRunnerProcess);
|
||||||
|
taskRunnerProcess.on('exit', (code) => {
|
||||||
|
this.onProcessExit(code, resolve);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract startProcess(grantToken: string, taskBrokerUri: string): Promise<ChildProcess>;
|
||||||
|
setupProcessMonitoring?(process: ChildProcess): void;
|
||||||
|
analyzeExitReason?(code: number | null): { reason: ExitReason };
|
||||||
|
}
|
||||||
89
packages/cli/src/task-runners/task-runner-process-js.ts
Normal file
89
packages/cli/src/task-runners/task-runner-process-js.ts
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
import { Logger } from '@n8n/backend-common';
|
||||||
|
import { TaskRunnersConfig } from '@n8n/config';
|
||||||
|
import { Service } from '@n8n/di';
|
||||||
|
import assert from 'node:assert/strict';
|
||||||
|
import { spawn } from 'node:child_process';
|
||||||
|
import * as process from 'node:process';
|
||||||
|
|
||||||
|
import { NodeProcessOomDetector } from './node-process-oom-detector';
|
||||||
|
import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service';
|
||||||
|
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
|
||||||
|
import { ChildProcess, ExitReason, TaskRunnerProcessBase } from './task-runner-process-base';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for managing a JavaScript task runner as a child process.
|
||||||
|
* This is for internal mode, which is NOT recommended for production.
|
||||||
|
*/
|
||||||
|
@Service()
|
||||||
|
export class JsTaskRunnerProcess extends TaskRunnerProcessBase {
|
||||||
|
readonly name = 'runnner:js';
|
||||||
|
|
||||||
|
readonly loggerScope = 'task-runner-js';
|
||||||
|
|
||||||
|
private oomDetector: NodeProcessOomDetector | null = null;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
readonly logger: Logger,
|
||||||
|
readonly runnerConfig: TaskRunnersConfig,
|
||||||
|
readonly authService: TaskBrokerAuthService,
|
||||||
|
readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents,
|
||||||
|
) {
|
||||||
|
super(logger, runnerConfig, authService, runnerLifecycleEvents);
|
||||||
|
|
||||||
|
assert(this.isInternal, `${this.constructor.name} cannot be used in external mode`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async startProcess(grantToken: string, taskBrokerUri: string): Promise<ChildProcess> {
|
||||||
|
const startScript = require.resolve('@n8n/task-runner/start');
|
||||||
|
const flags = this.runnerConfig.insecureMode
|
||||||
|
? []
|
||||||
|
: ['--disallow-code-generation-from-strings', '--disable-proto=delete'];
|
||||||
|
|
||||||
|
return spawn('node', [...flags, startScript], {
|
||||||
|
env: this.getProcessEnvVars(grantToken, taskBrokerUri),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
setupProcessMonitoring(process: ChildProcess) {
|
||||||
|
this.oomDetector = new NodeProcessOomDetector(process);
|
||||||
|
}
|
||||||
|
|
||||||
|
analyzeExitReason(): { reason: ExitReason } {
|
||||||
|
return { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' };
|
||||||
|
}
|
||||||
|
|
||||||
|
private getProcessEnvVars(grantToken: string, taskBrokerUri: string) {
|
||||||
|
const envVars: Record<string, string | undefined> = {
|
||||||
|
// system environment
|
||||||
|
PATH: process.env.PATH,
|
||||||
|
HOME: process.env.HOME,
|
||||||
|
NODE_PATH: process.env.NODE_PATH,
|
||||||
|
|
||||||
|
// n8n
|
||||||
|
GENERIC_TIMEZONE: process.env.GENERIC_TIMEZONE,
|
||||||
|
NODE_FUNCTION_ALLOW_BUILTIN: process.env.NODE_FUNCTION_ALLOW_BUILTIN,
|
||||||
|
NODE_FUNCTION_ALLOW_EXTERNAL: process.env.NODE_FUNCTION_ALLOW_EXTERNAL,
|
||||||
|
|
||||||
|
// sentry
|
||||||
|
N8N_SENTRY_DSN: process.env.N8N_SENTRY_DSN,
|
||||||
|
N8N_VERSION: process.env.N8N_VERSION,
|
||||||
|
ENVIRONMENT: process.env.ENVIRONMENT,
|
||||||
|
DEPLOYMENT_NAME: process.env.DEPLOYMENT_NAME,
|
||||||
|
|
||||||
|
// runner
|
||||||
|
N8N_RUNNERS_GRANT_TOKEN: grantToken,
|
||||||
|
N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri,
|
||||||
|
N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(),
|
||||||
|
N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(),
|
||||||
|
N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(),
|
||||||
|
N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(),
|
||||||
|
N8N_RUNNERS_INSECURE_MODE: process.env.N8N_RUNNERS_INSECURE_MODE,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (this.runnerConfig.maxOldSpaceSize) {
|
||||||
|
envVars.NODE_OPTIONS = `--max-old-space-size=${this.runnerConfig.maxOldSpaceSize}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
return envVars;
|
||||||
|
}
|
||||||
|
}
|
||||||
68
packages/cli/src/task-runners/task-runner-process-py.ts
Normal file
68
packages/cli/src/task-runners/task-runner-process-py.ts
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import { Logger } from '@n8n/backend-common';
|
||||||
|
import { TaskRunnersConfig } from '@n8n/config';
|
||||||
|
import { Service } from '@n8n/di';
|
||||||
|
import { exec, spawn } from 'node:child_process';
|
||||||
|
import { access } from 'node:fs/promises';
|
||||||
|
import path from 'node:path';
|
||||||
|
import { promisify } from 'node:util';
|
||||||
|
|
||||||
|
import { MissingRequirementsError } from './errors/missing-requirements.error';
|
||||||
|
import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service';
|
||||||
|
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
|
||||||
|
import { TaskRunnerProcessBase } from './task-runner-process-base';
|
||||||
|
|
||||||
|
const asyncExec = promisify(exec);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for managing a Python task runner as a child process.
|
||||||
|
* This is for internal mode, which is NOT recommended for production.
|
||||||
|
*/
|
||||||
|
@Service()
|
||||||
|
export class PyTaskRunnerProcess extends TaskRunnerProcessBase {
|
||||||
|
protected readonly name = 'runner:py';
|
||||||
|
|
||||||
|
protected readonly loggerScope = 'task-runner-py';
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
readonly logger: Logger,
|
||||||
|
readonly runnerConfig: TaskRunnersConfig,
|
||||||
|
readonly authService: TaskBrokerAuthService,
|
||||||
|
readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents,
|
||||||
|
) {
|
||||||
|
super(logger, runnerConfig, authService, runnerLifecycleEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
async startProcess(grantToken: string, taskBrokerUri: string) {
|
||||||
|
try {
|
||||||
|
await asyncExec('python3 --version', { timeout: 5000 });
|
||||||
|
} catch {
|
||||||
|
throw new MissingRequirementsError('python');
|
||||||
|
}
|
||||||
|
|
||||||
|
const pythonDir = path.join(__dirname, '../../../@n8n/task-runner-python');
|
||||||
|
const venvPath = path.join(pythonDir, '.venv/bin/python');
|
||||||
|
|
||||||
|
try {
|
||||||
|
await access(venvPath);
|
||||||
|
} catch {
|
||||||
|
throw new MissingRequirementsError('venv');
|
||||||
|
}
|
||||||
|
|
||||||
|
return spawn(venvPath, ['-m', 'src.main'], {
|
||||||
|
cwd: pythonDir,
|
||||||
|
env: {
|
||||||
|
// system environment
|
||||||
|
PATH: process.env.PATH,
|
||||||
|
HOME: process.env.HOME,
|
||||||
|
|
||||||
|
// runner
|
||||||
|
N8N_RUNNERS_GRANT_TOKEN: grantToken,
|
||||||
|
N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri,
|
||||||
|
N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(),
|
||||||
|
N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(),
|
||||||
|
N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(),
|
||||||
|
N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,9 +1,10 @@
|
|||||||
import { Time } from '@n8n/constants';
|
import { Time } from '@n8n/constants';
|
||||||
|
|
||||||
import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
|
import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
|
||||||
import type { TaskRunnerProcess } from '@/task-runners/task-runner-process';
|
|
||||||
import { TypedEmitter } from '@/typed-emitter';
|
import { TypedEmitter } from '@/typed-emitter';
|
||||||
|
|
||||||
|
import type { TaskRunnerProcessBase } from './task-runner-process-base';
|
||||||
|
|
||||||
const MAX_RESTARTS = 5;
|
const MAX_RESTARTS = 5;
|
||||||
const RESTARTS_WINDOW = 2 * Time.seconds.toMilliseconds;
|
const RESTARTS_WINDOW = 2 * Time.seconds.toMilliseconds;
|
||||||
|
|
||||||
@@ -32,7 +33,7 @@ export class TaskRunnerProcessRestartLoopDetector extends TypedEmitter<TaskRunne
|
|||||||
/** Time when the first restart of a loop happened within a time window */
|
/** Time when the first restart of a loop happened within a time window */
|
||||||
private firstRestartedAt = Date.now();
|
private firstRestartedAt = Date.now();
|
||||||
|
|
||||||
constructor(private readonly taskRunnerProcess: TaskRunnerProcess) {
|
constructor(private readonly taskRunnerProcess: TaskRunnerProcessBase) {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
this.taskRunnerProcess.on('exit', () => {
|
this.taskRunnerProcess.on('exit', () => {
|
||||||
|
|||||||
@@ -1,201 +0,0 @@
|
|||||||
import { Logger } from '@n8n/backend-common';
|
|
||||||
import { TaskRunnersConfig } from '@n8n/config';
|
|
||||||
import { OnShutdown } from '@n8n/decorators';
|
|
||||||
import { Service } from '@n8n/di';
|
|
||||||
import * as a from 'node:assert/strict';
|
|
||||||
import { spawn } from 'node:child_process';
|
|
||||||
import * as process from 'node:process';
|
|
||||||
|
|
||||||
import { forwardToLogger } from './forward-to-logger';
|
|
||||||
import { NodeProcessOomDetector } from './node-process-oom-detector';
|
|
||||||
import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service';
|
|
||||||
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
|
|
||||||
import { TypedEmitter } from '../typed-emitter';
|
|
||||||
|
|
||||||
type ChildProcess = ReturnType<typeof spawn>;
|
|
||||||
|
|
||||||
export type ExitReason = 'unknown' | 'oom';
|
|
||||||
|
|
||||||
export type TaskRunnerProcessEventMap = {
|
|
||||||
exit: {
|
|
||||||
reason: ExitReason;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manages the JS task runner process as a child process
|
|
||||||
*/
|
|
||||||
@Service()
|
|
||||||
export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
|
||||||
get isRunning() {
|
|
||||||
return this.process !== null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** The process ID of the task runner process */
|
|
||||||
get pid() {
|
|
||||||
return this.process?.pid;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Promise that resolves when the process has exited */
|
|
||||||
get runPromise() {
|
|
||||||
return this._runPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
private process: ChildProcess | null = null;
|
|
||||||
|
|
||||||
private _runPromise: Promise<void> | null = null;
|
|
||||||
|
|
||||||
private oomDetector: NodeProcessOomDetector | null = null;
|
|
||||||
|
|
||||||
private isShuttingDown = false;
|
|
||||||
|
|
||||||
private logger: Logger;
|
|
||||||
|
|
||||||
/** Environment variables inherited by the child process from the current environment. */
|
|
||||||
private readonly passthroughEnvVars = [
|
|
||||||
'PATH',
|
|
||||||
'HOME', // So home directory can be resolved correctly
|
|
||||||
'GENERIC_TIMEZONE',
|
|
||||||
'NODE_FUNCTION_ALLOW_BUILTIN',
|
|
||||||
'NODE_FUNCTION_ALLOW_EXTERNAL',
|
|
||||||
'N8N_SENTRY_DSN',
|
|
||||||
'N8N_RUNNERS_INSECURE_MODE',
|
|
||||||
// Metadata about the environment
|
|
||||||
'N8N_VERSION',
|
|
||||||
'ENVIRONMENT',
|
|
||||||
'DEPLOYMENT_NAME',
|
|
||||||
'NODE_PATH',
|
|
||||||
] as const;
|
|
||||||
|
|
||||||
private readonly mode: 'insecure' | 'secure' = 'secure';
|
|
||||||
|
|
||||||
constructor(
|
|
||||||
logger: Logger,
|
|
||||||
private readonly runnerConfig: TaskRunnersConfig,
|
|
||||||
private readonly authService: TaskBrokerAuthService,
|
|
||||||
private readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents,
|
|
||||||
) {
|
|
||||||
super();
|
|
||||||
|
|
||||||
a.ok(
|
|
||||||
this.runnerConfig.mode !== 'external',
|
|
||||||
'Task Runner Process cannot be used in external mode',
|
|
||||||
);
|
|
||||||
|
|
||||||
this.mode = this.runnerConfig.insecureMode ? 'insecure' : 'secure';
|
|
||||||
|
|
||||||
this.logger = logger.scoped('task-runner');
|
|
||||||
|
|
||||||
this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => {
|
|
||||||
this.logger.warn('Task runner failed heartbeat check, restarting...');
|
|
||||||
void this.forceRestart();
|
|
||||||
});
|
|
||||||
|
|
||||||
this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => {
|
|
||||||
this.logger.warn('Task runner timed out during task, restarting...');
|
|
||||||
void this.forceRestart();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async start() {
|
|
||||||
a.ok(!this.process, 'Task Runner Process already running');
|
|
||||||
|
|
||||||
const grantToken = await this.authService.createGrantToken();
|
|
||||||
|
|
||||||
const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`;
|
|
||||||
this.process = this.startNode(grantToken, taskBrokerUri);
|
|
||||||
|
|
||||||
forwardToLogger(this.logger, this.process, '[Task Runner]: ');
|
|
||||||
|
|
||||||
this.monitorProcess(this.process);
|
|
||||||
}
|
|
||||||
|
|
||||||
startNode(grantToken: string, taskBrokerUri: string) {
|
|
||||||
const startScript = require.resolve('@n8n/task-runner/start');
|
|
||||||
|
|
||||||
const flags =
|
|
||||||
this.mode === 'secure'
|
|
||||||
? ['--disallow-code-generation-from-strings', '--disable-proto=delete']
|
|
||||||
: [];
|
|
||||||
|
|
||||||
return spawn('node', [...flags, startScript], {
|
|
||||||
env: this.getProcessEnvVars(grantToken, taskBrokerUri),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnShutdown()
|
|
||||||
async stop() {
|
|
||||||
if (!this.process) return;
|
|
||||||
|
|
||||||
this.isShuttingDown = true;
|
|
||||||
|
|
||||||
// TODO: Timeout & force kill
|
|
||||||
this.killNode();
|
|
||||||
await this._runPromise;
|
|
||||||
|
|
||||||
this.isShuttingDown = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Force-restart a runner suspected of being unresponsive. */
|
|
||||||
async forceRestart() {
|
|
||||||
if (!this.process) return;
|
|
||||||
|
|
||||||
this.process.kill('SIGKILL');
|
|
||||||
|
|
||||||
await this._runPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
killNode() {
|
|
||||||
if (!this.process) return;
|
|
||||||
|
|
||||||
this.process.kill();
|
|
||||||
}
|
|
||||||
|
|
||||||
private monitorProcess(taskRunnerProcess: ChildProcess) {
|
|
||||||
this._runPromise = new Promise((resolve) => {
|
|
||||||
this.oomDetector = new NodeProcessOomDetector(taskRunnerProcess);
|
|
||||||
|
|
||||||
taskRunnerProcess.on('exit', (code) => {
|
|
||||||
this.onProcessExit(code, resolve);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private onProcessExit(_code: number | null, resolveFn: () => void) {
|
|
||||||
this.process = null;
|
|
||||||
this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' });
|
|
||||||
resolveFn();
|
|
||||||
|
|
||||||
if (!this.isShuttingDown) {
|
|
||||||
setImmediate(async () => await this.start());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private getProcessEnvVars(grantToken: string, taskBrokerUri: string) {
|
|
||||||
const envVars: Record<string, string> = {
|
|
||||||
N8N_RUNNERS_GRANT_TOKEN: grantToken,
|
|
||||||
N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri,
|
|
||||||
N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(),
|
|
||||||
N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(),
|
|
||||||
N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(),
|
|
||||||
N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(),
|
|
||||||
...this.getPassthroughEnvVars(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if (this.runnerConfig.maxOldSpaceSize) {
|
|
||||||
envVars.NODE_OPTIONS = `--max-old-space-size=${this.runnerConfig.maxOldSpaceSize}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
return envVars;
|
|
||||||
}
|
|
||||||
|
|
||||||
private getPassthroughEnvVars() {
|
|
||||||
return this.passthroughEnvVars.reduce<Record<string, string>>((env, key) => {
|
|
||||||
if (process.env[key]) {
|
|
||||||
env[key] = process.env[key];
|
|
||||||
}
|
|
||||||
|
|
||||||
return env;
|
|
||||||
}, {});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -18,7 +18,7 @@ import { Publisher } from '@/scaling/pubsub/publisher.service';
|
|||||||
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
|
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
|
||||||
import { ScalingService } from '@/scaling/scaling.service';
|
import { ScalingService } from '@/scaling/scaling.service';
|
||||||
import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server';
|
import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server';
|
||||||
import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
|
import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js';
|
||||||
import { Telemetry } from '@/telemetry';
|
import { Telemetry } from '@/telemetry';
|
||||||
import { setupTestCommand } from '@test-integration/utils/test-command';
|
import { setupTestCommand } from '@test-integration/utils/test-command';
|
||||||
|
|
||||||
@@ -34,7 +34,7 @@ const messageEventBus = mockInstance(MessageEventBus);
|
|||||||
const logStreamingEventRelay = mockInstance(LogStreamingEventRelay);
|
const logStreamingEventRelay = mockInstance(LogStreamingEventRelay);
|
||||||
const scalingService = mockInstance(ScalingService);
|
const scalingService = mockInstance(ScalingService);
|
||||||
const taskBrokerServer = mockInstance(TaskBrokerServer);
|
const taskBrokerServer = mockInstance(TaskBrokerServer);
|
||||||
const taskRunnerProcess = mockInstance(TaskRunnerProcess);
|
const taskRunnerProcess = mockInstance(JsTaskRunnerProcess);
|
||||||
mockInstance(Publisher);
|
mockInstance(Publisher);
|
||||||
mockInstance(Subscriber);
|
mockInstance(Subscriber);
|
||||||
mockInstance(Telemetry);
|
mockInstance(Telemetry);
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import { Container } from '@n8n/di';
|
|||||||
|
|
||||||
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
|
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
|
||||||
import { TaskBroker } from '@/task-runners/task-broker/task-broker.service';
|
import { TaskBroker } from '@/task-runners/task-broker/task-broker.service';
|
||||||
import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
|
import { JsTaskRunnerProcess } from '@/task-runners/task-runner-process-js';
|
||||||
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
||||||
import { retryUntil } from '@test-integration/retry-until';
|
import { retryUntil } from '@test-integration/retry-until';
|
||||||
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
|
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
|
||||||
@@ -11,7 +11,7 @@ describe('TaskRunnerProcess', () => {
|
|||||||
const { config, server: taskRunnerServer } = setupBrokerTestServer({
|
const { config, server: taskRunnerServer } = setupBrokerTestServer({
|
||||||
mode: 'internal',
|
mode: 'internal',
|
||||||
});
|
});
|
||||||
const runnerProcess = Container.get(TaskRunnerProcess);
|
const runnerProcess = Container.get(JsTaskRunnerProcess);
|
||||||
const taskBroker = Container.get(TaskBroker);
|
const taskBroker = Container.get(TaskBroker);
|
||||||
const taskRunnerService = Container.get(TaskBrokerWsServer);
|
const taskRunnerService = Container.get(TaskBrokerWsServer);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user