diff --git a/docker/images/n8n/n8n-task-runners.json b/docker/images/n8n/n8n-task-runners.json index a37c59fccb..d9575997c0 100644 --- a/docker/images/n8n/n8n-task-runners.json +++ b/docker/images/n8n/n8n-task-runners.json @@ -9,12 +9,12 @@ "PATH", "GENERIC_TIMEZONE", "N8N_RUNNERS_GRANT_TOKEN", - "N8N_RUNNERS_N8N_URI", + "N8N_RUNNERS_TASK_BROKER_URI", "N8N_RUNNERS_MAX_PAYLOAD", "N8N_RUNNERS_MAX_CONCURRENCY", - "N8N_RUNNERS_SERVER_ENABLED", - "N8N_RUNNERS_SERVER_HOST", - "N8N_RUNNERS_SERVER_PORT", + "N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED", + "N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST", + "N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT", "NODE_FUNCTION_ALLOW_BUILTIN", "NODE_FUNCTION_ALLOW_EXTERNAL", "NODE_OPTIONS", diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index d3fca6da08..06e262fe49 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -24,7 +24,7 @@ export class TaskRunnersConfig { authToken: string = ''; /** IP address task runners server should listen on */ - @Env('N8N_RUNNERS_SERVER_PORT') + @Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT') port: number = 5679; /** IP address task runners server should listen on */ diff --git a/packages/@n8n/task-runner/src/config/base-runner-config.ts b/packages/@n8n/task-runner/src/config/base-runner-config.ts index d70f7e2ee8..a1059adf4b 100644 --- a/packages/@n8n/task-runner/src/config/base-runner-config.ts +++ b/packages/@n8n/task-runner/src/config/base-runner-config.ts @@ -2,20 +2,20 @@ import { Config, Env, Nested } from '@n8n/config'; @Config class HealthcheckServerConfig { - @Env('N8N_RUNNERS_SERVER_ENABLED') + @Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED') enabled: boolean = false; - @Env('N8N_RUNNERS_SERVER_HOST') + @Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST') host: string = '127.0.0.1'; - @Env('N8N_RUNNERS_SERVER_PORT') + @Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT') port: number = 5681; } @Config export class BaseRunnerConfig { - @Env('N8N_RUNNERS_N8N_URI') - n8nUri: string = '127.0.0.1:5679'; + @Env('N8N_RUNNERS_TASK_BROKER_URI') + taskBrokerUri: string = 'http://127.0.0.1:5679'; @Env('N8N_RUNNERS_GRANT_TOKEN') grantToken: string = ''; diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index a99e8b9f07..439de19eac 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -34,7 +34,7 @@ describe('JsTaskRunner', () => { ...defaultConfig.baseRunnerConfig, grantToken: 'grantToken', maxConcurrency: 1, - n8nUri: 'localhost', + taskBrokerUri: 'http://localhost', ...baseRunnerOpts, }, jsRunnerConfig: { @@ -311,10 +311,10 @@ describe('JsTaskRunner', () => { }); it("should not expose task runner's env variables even if no env state is received", async () => { - process.env.N8N_RUNNERS_N8N_URI = 'http://127.0.0.1:5679'; + process.env.N8N_RUNNERS_TASK_BROKER_URI = 'http://127.0.0.1:5679'; const outcome = await execTaskWithParams({ task: newTaskWithSettings({ - code: 'return { val: $env.N8N_RUNNERS_N8N_URI }', + code: 'return { val: $env.N8N_RUNNERS_TASK_BROKER_URI }', nodeMode: 'runOnceForAllItems', }), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts new file mode 100644 index 0000000000..c633e95688 --- /dev/null +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts @@ -0,0 +1,89 @@ +import { WebSocket } from 'ws'; + +import { TaskRunner } from '@/task-runner'; + +class TestRunner extends TaskRunner {} + +jest.mock('ws'); + +describe('TestRunner', () => { + describe('constructor', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should correctly construct WebSocket URI with provided taskBrokerUri', () => { + const runner = new TestRunner({ + taskType: 'test-task', + maxConcurrency: 5, + idleTimeout: 60, + grantToken: 'test-token', + maxPayloadSize: 1024, + taskBrokerUri: 'http://localhost:8080', + timezone: 'America/New_York', + healthcheckServer: { + enabled: false, + host: 'localhost', + port: 8081, + }, + }); + + expect(WebSocket).toHaveBeenCalledWith( + `ws://localhost:8080/runners/_ws?id=${runner.id}`, + expect.objectContaining({ + headers: { + authorization: 'Bearer test-token', + }, + maxPayload: 1024, + }), + ); + }); + + it('should handle different taskBrokerUri formats correctly', () => { + const runner = new TestRunner({ + taskType: 'test-task', + maxConcurrency: 5, + idleTimeout: 60, + grantToken: 'test-token', + maxPayloadSize: 1024, + taskBrokerUri: 'https://example.com:3000/path', + timezone: 'America/New_York', + healthcheckServer: { + enabled: false, + host: 'localhost', + port: 8081, + }, + }); + + expect(WebSocket).toHaveBeenCalledWith( + `ws://example.com:3000/runners/_ws?id=${runner.id}`, + expect.objectContaining({ + headers: { + authorization: 'Bearer test-token', + }, + maxPayload: 1024, + }), + ); + }); + + it('should throw an error if taskBrokerUri is invalid', () => { + expect( + () => + new TestRunner({ + taskType: 'test-task', + maxConcurrency: 5, + idleTimeout: 60, + grantToken: 'test-token', + maxPayloadSize: 1024, + taskBrokerUri: 'not-a-valid-uri', + timezone: 'America/New_York', + healthcheckServer: { + enabled: false, + host: 'localhost', + port: 8081, + }, + }), + ).toThrowError(/Invalid URL/); + }); + }); +}); diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index 1486af280d..f0af115b5a 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -92,7 +92,9 @@ export abstract class TaskRunner extends EventEmitter { this.maxConcurrency = opts.maxConcurrency; this.idleTimeout = opts.idleTimeout; - const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`; + const { host: taskBrokerHost } = new URL(opts.taskBrokerUri); + + const wsUrl = `ws://${taskBrokerHost}/runners/_ws?id=${this.id}`; this.ws = new WebSocket(wsUrl, { headers: { authorization: `Bearer ${opts.grantToken}`, @@ -109,11 +111,11 @@ export abstract class TaskRunner extends EventEmitter { ['ECONNREFUSED', 'ENOTFOUND'].some((code) => code === error.code) ) { console.error( - `Error: Failed to connect to n8n. Please ensure n8n is reachable at: ${opts.n8nUri}`, + `Error: Failed to connect to n8n task broker. Please ensure n8n task broker is reachable at: ${taskBrokerHost}`, ); process.exit(1); } else { - console.error(`Error: Failed to connect to n8n at ${opts.n8nUri}`); + console.error(`Error: Failed to connect to n8n task broker at ${taskBrokerHost}`); console.error('Details:', event.message || 'Unknown error'); } }); diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index e33157a286..d989107718 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -95,19 +95,19 @@ export class TaskRunnerProcess extends TypedEmitter { const grantToken = await this.authService.createGrantToken(); - const n8nUri = `127.0.0.1:${this.runnerConfig.port}`; - this.process = this.startNode(grantToken, n8nUri); + const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`; + this.process = this.startNode(grantToken, taskBrokerUri); forwardToLogger(this.logger, this.process, '[Task Runner]: '); this.monitorProcess(this.process); } - startNode(grantToken: string, n8nUri: string) { + startNode(grantToken: string, taskBrokerUri: string) { const startScript = require.resolve('@n8n/task-runner/start'); return spawn('node', [startScript], { - env: this.getProcessEnvVars(grantToken, n8nUri), + env: this.getProcessEnvVars(grantToken, taskBrokerUri), }); } @@ -159,10 +159,10 @@ export class TaskRunnerProcess extends TypedEmitter { } } - private getProcessEnvVars(grantToken: string, n8nUri: string) { + private getProcessEnvVars(grantToken: string, taskBrokerUri: string) { const envVars: Record = { N8N_RUNNERS_GRANT_TOKEN: grantToken, - N8N_RUNNERS_N8N_URI: n8nUri, + N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri, N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(), N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(), ...this.getPassthroughEnvVars(),