From 74fa259b371c8d41ca2a8b0162d62cab353a76f0 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 2 Oct 2024 15:16:02 +0300 Subject: [PATCH] feat: Make task runners work with n8n from npm (no-changelog) (#11015) --- .../@n8n/task-runner-node-js/package.json | 8 +- .../@n8n/task-runner-node-js/src/start.ts | 26 ++++-- packages/cli/src/commands/start.ts | 3 + packages/cli/src/runners/runner-ws-server.ts | 13 +-- .../cli/src/runners/task-runner-process.ts | 89 +++++++++++++++++++ .../runners/task-runner-process.test.ts | 81 +++++++++++++++++ .../shared/utils/task-runner-test-server.ts | 43 +++++++++ 7 files changed, 245 insertions(+), 18 deletions(-) create mode 100644 packages/cli/src/runners/task-runner-process.ts create mode 100644 packages/cli/test/integration/runners/task-runner-process.test.ts create mode 100644 packages/cli/test/integration/shared/utils/task-runner-test-server.ts diff --git a/packages/@n8n/task-runner-node-js/package.json b/packages/@n8n/task-runner-node-js/package.json index ee94e34399..d7f397c9e4 100644 --- a/packages/@n8n/task-runner-node-js/package.json +++ b/packages/@n8n/task-runner-node-js/package.json @@ -3,7 +3,7 @@ "private": true, "version": "0.1.0", "description": "", - "main": "dist/index.js", + "main": "dist/start.js", "scripts": { "start": "node dist/start.js", "dev": "pnpm build && pnpm start", @@ -23,9 +23,9 @@ "package.json", "tsconfig.json" ], - "main": "dist/index.js", - "module": "src/index.ts", - "types": "dist/index.d.ts", + "main": "dist/start.js", + "module": "src/start.ts", + "types": "dist/start.d.ts", "packageManager": "pnpm@9.6.0", "devDependencies": { "@n8n_io/eslint-config": "^0.0.2", diff --git a/packages/@n8n/task-runner-node-js/src/start.ts b/packages/@n8n/task-runner-node-js/src/start.ts index ac402dd1c0..b845000b9c 100644 --- a/packages/@n8n/task-runner-node-js/src/start.ts +++ b/packages/@n8n/task-runner-node-js/src/start.ts @@ -1,5 +1,4 @@ import * as a from 'node:assert/strict'; - import { JsTaskRunner } from './code'; import { authenticate } from './authenticator'; @@ -7,28 +6,39 @@ let _runner: JsTaskRunner; type Config = { n8nUri: string; - authToken: string; + authToken?: string; + grantToken?: string; }; function readAndParseConfig(): Config { const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN; - a.ok(authToken, 'Missing task runner auth token. Use N8N_RUNNERS_AUTH_TOKEN to configure it'); + const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN; + if (!authToken && !grantToken) { + throw new Error( + 'Missing task runner authentication. Use either N8N_RUNNERS_AUTH_TOKEN or N8N_RUNNERS_GRANT_TOKEN to configure it', + ); + } return { n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? 'localhost:5678', authToken, + grantToken, }; } void (async function start() { const config = readAndParseConfig(); - const grantToken = await authenticate({ - authToken: config.authToken, - n8nUri: config.n8nUri, - }); + let grantToken = config.grantToken; + if (!grantToken) { + a.ok(config.authToken); + + grantToken = await authenticate({ + authToken: config.authToken, + n8nUri: config.n8nUri, + }); + } const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`; - _runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5); })(); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 755eecec57..8dcdfe5a0e 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -225,6 +225,9 @@ export class Start extends BaseCommand { if (!this.globalConfig.taskRunners.disabled) { Container.set(TaskManager, new SingleMainTaskManager()); + const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); + const runnerProcess = Container.get(TaskRunnerProcess); + await runnerProcess.start(); } } diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index e9b824499d..ef9e52f5f5 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -44,7 +44,7 @@ function getWsEndpoint(restEndpoint: string) { @Service() export class TaskRunnerService { - runnerConnections: Record = {}; + runnerConnections: Map = new Map(); constructor( private readonly logger: Logger, @@ -52,7 +52,7 @@ export class TaskRunnerService { ) {} sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) { - this.runnerConnections[id]?.send(JSON.stringify(message)); + this.runnerConnections.get(id)?.send(JSON.stringify(message)); } add(id: TaskRunner['id'], connection: WebSocket) { @@ -75,7 +75,7 @@ export class TaskRunnerService { this.removeConnection(id); isConnected = true; - this.runnerConnections[id] = connection; + this.runnerConnections.set(id, connection); this.taskBroker.registerRunner( { @@ -117,10 +117,11 @@ export class TaskRunnerService { } removeConnection(id: TaskRunner['id']) { - if (id in this.runnerConnections) { + const connection = this.runnerConnections.get(id); + if (connection) { this.taskBroker.deregisterRunner(id); - this.runnerConnections[id].close(); - delete this.runnerConnections[id]; + connection.close(); + this.runnerConnections.delete(id); } } diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts new file mode 100644 index 0000000000..3bee225649 --- /dev/null +++ b/packages/cli/src/runners/task-runner-process.ts @@ -0,0 +1,89 @@ +import { GlobalConfig } from '@n8n/config'; +import * as a from 'node:assert/strict'; +import { spawn } from 'node:child_process'; +import { Service } from 'typedi'; + +import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; +import { OnShutdown } from '../decorators/on-shutdown'; + +type ChildProcess = ReturnType; + +/** + * Manages the JS task runner process as a child process + */ +@Service() +export class TaskRunnerProcess { + public get isRunning() { + return this.process !== null; + } + + /** The process ID of the task runner process */ + public get pid() { + return this.process?.pid; + } + + private process: ChildProcess | null = null; + + /** Promise that resolves after the process has exited */ + private runPromise: Promise | null = null; + + private isShuttingDown = false; + + constructor( + private readonly globalConfig: GlobalConfig, + private readonly authService: TaskRunnerAuthService, + ) {} + + async start() { + a.ok(!this.process, 'Task Runner Process already running'); + + const grantToken = await this.authService.createGrantToken(); + const startScript = require.resolve('@n8n/task-runner'); + + this.process = spawn('node', [startScript], { + env: { + PATH: process.env.PATH, + N8N_RUNNERS_GRANT_TOKEN: grantToken, + N8N_RUNNERS_N8N_URI: `localhost:${this.globalConfig.port}`, + }, + }); + + this.process.stdout?.pipe(process.stdout); + this.process.stderr?.pipe(process.stderr); + + this.monitorProcess(this.process); + } + + @OnShutdown() + async stop() { + if (!this.process) { + return; + } + + this.isShuttingDown = true; + + // TODO: Timeout & force kill + this.process.kill(); + await this.runPromise; + + this.isShuttingDown = false; + } + + private monitorProcess(process: ChildProcess) { + this.runPromise = new Promise((resolve) => { + process.on('exit', (code) => { + this.onProcessExit(code, resolve); + }); + }); + } + + private onProcessExit(_code: number | null, resolveFn: () => void) { + this.process = null; + resolveFn(); + + // If we are not shutting down, restart the process + if (!this.isShuttingDown) { + setImmediate(async () => await this.start()); + } + } +} diff --git a/packages/cli/test/integration/runners/task-runner-process.test.ts b/packages/cli/test/integration/runners/task-runner-process.test.ts new file mode 100644 index 0000000000..57e3c1d480 --- /dev/null +++ b/packages/cli/test/integration/runners/task-runner-process.test.ts @@ -0,0 +1,81 @@ +import { GlobalConfig } from '@n8n/config'; +import Container from 'typedi'; + +import { TaskRunnerService } from '@/runners/runner-ws-server'; +import { TaskBroker } from '@/runners/task-broker.service'; +import { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { retryUntil } from '@test-integration/retry-until'; +import { setupTaskRunnerTestServer } from '@test-integration/utils/task-runner-test-server'; + +describe('TaskRunnerProcess', () => { + const authToken = 'token'; + const globalConfig = Container.get(GlobalConfig); + globalConfig.taskRunners.authToken = authToken; + const testServer = setupTaskRunnerTestServer({}); + globalConfig.port = testServer.port; + + const runnerProcess = Container.get(TaskRunnerProcess); + const taskBroker = Container.get(TaskBroker); + const taskRunnerService = Container.get(TaskRunnerService); + + afterEach(async () => { + await runnerProcess.stop(); + }); + + const getNumConnectedRunners = () => taskRunnerService.runnerConnections.size; + const getNumRegisteredRunners = () => taskBroker.getKnownRunners().size; + + it('should start and connect the task runner', async () => { + // Act + await runnerProcess.start(); + + // Assert + expect(runnerProcess.isRunning).toBeTruthy(); + + // Wait until the runner has connected + await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); + expect(getNumRegisteredRunners()).toBe(1); + }); + + it('should stop an disconnect the task runner', async () => { + // Arrange + await runnerProcess.start(); + + // Wait until the runner has connected + await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); + expect(getNumRegisteredRunners()).toBe(1); + + // Act + await runnerProcess.stop(); + + // Assert + // Wait until the runner has disconnected + await retryUntil(() => expect(getNumConnectedRunners()).toBe(0)); + + expect(runnerProcess.isRunning).toBeFalsy(); + expect(getNumRegisteredRunners()).toBe(0); + }); + + it('should restart the task runner if it exits', async () => { + // Arrange + await runnerProcess.start(); + + // Wait until the runner has connected + await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); + const processId = runnerProcess.pid; + + // Act + // @ts-expect-error private property + runnerProcess.process?.kill('SIGKILL'); + + // Assert + // Wait until the runner is running again + await retryUntil(() => expect(runnerProcess.isRunning).toBeTruthy()); + expect(runnerProcess.pid).not.toBe(processId); + + // Wait until the runner has connected again + await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); + expect(getNumConnectedRunners()).toBe(1); + expect(getNumRegisteredRunners()).toBe(1); + }); +}); diff --git a/packages/cli/test/integration/shared/utils/task-runner-test-server.ts b/packages/cli/test/integration/shared/utils/task-runner-test-server.ts new file mode 100644 index 0000000000..3eba046765 --- /dev/null +++ b/packages/cli/test/integration/shared/utils/task-runner-test-server.ts @@ -0,0 +1,43 @@ +import { GlobalConfig } from '@n8n/config'; +import cookieParser from 'cookie-parser'; +import type { Application } from 'express'; +import express from 'express'; +import type { Server } from 'node:http'; +import type { AddressInfo } from 'node:net'; +import Container from 'typedi'; + +import { rawBodyReader } from '@/middlewares'; +import { setupRunnerHandler, setupRunnerServer } from '@/runners/runner-ws-server'; + +export interface TaskRunnerTestServer { + app: Application; + httpServer: Server; + port: number; +} + +/** + * Sets up a task runner HTTP & WS server for testing purposes + */ +export const setupTaskRunnerTestServer = ({}): TaskRunnerTestServer => { + const app = express(); + app.use(rawBodyReader); + app.use(cookieParser()); + + const testServer: TaskRunnerTestServer = { + app, + httpServer: app.listen(0), + port: 0, + }; + + testServer.port = (testServer.httpServer.address() as AddressInfo).port; + + const globalConfig = Container.get(GlobalConfig); + setupRunnerServer(globalConfig.endpoints.rest, testServer.httpServer, testServer.app); + setupRunnerHandler(globalConfig.endpoints.rest, testServer.app); + + afterAll(async () => { + testServer.httpServer.close(); + }); + + return testServer; +};