refactor: Unify task runner env vars (#12040)

This commit is contained in:
Tomi Turtiainen
2024-12-05 12:36:43 +02:00
committed by GitHub
parent 0537524c3e
commit 2e979547a4
7 changed files with 113 additions and 22 deletions

View File

@@ -9,12 +9,12 @@
"PATH",
"GENERIC_TIMEZONE",
"N8N_RUNNERS_GRANT_TOKEN",
"N8N_RUNNERS_N8N_URI",
"N8N_RUNNERS_TASK_BROKER_URI",
"N8N_RUNNERS_MAX_PAYLOAD",
"N8N_RUNNERS_MAX_CONCURRENCY",
"N8N_RUNNERS_SERVER_ENABLED",
"N8N_RUNNERS_SERVER_HOST",
"N8N_RUNNERS_SERVER_PORT",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT",
"NODE_FUNCTION_ALLOW_BUILTIN",
"NODE_FUNCTION_ALLOW_EXTERNAL",
"NODE_OPTIONS",

View File

@@ -24,7 +24,7 @@ export class TaskRunnersConfig {
authToken: string = '';
/** IP address task runners server should listen on */
@Env('N8N_RUNNERS_SERVER_PORT')
@Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT')
port: number = 5679;
/** IP address task runners server should listen on */

View File

@@ -2,20 +2,20 @@ import { Config, Env, Nested } from '@n8n/config';
@Config
class HealthcheckServerConfig {
@Env('N8N_RUNNERS_SERVER_ENABLED')
@Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED')
enabled: boolean = false;
@Env('N8N_RUNNERS_SERVER_HOST')
@Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST')
host: string = '127.0.0.1';
@Env('N8N_RUNNERS_SERVER_PORT')
@Env('N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT')
port: number = 5681;
}
@Config
export class BaseRunnerConfig {
@Env('N8N_RUNNERS_N8N_URI')
n8nUri: string = '127.0.0.1:5679';
@Env('N8N_RUNNERS_TASK_BROKER_URI')
taskBrokerUri: string = 'http://127.0.0.1:5679';
@Env('N8N_RUNNERS_GRANT_TOKEN')
grantToken: string = '';

View File

@@ -34,7 +34,7 @@ describe('JsTaskRunner', () => {
...defaultConfig.baseRunnerConfig,
grantToken: 'grantToken',
maxConcurrency: 1,
n8nUri: 'localhost',
taskBrokerUri: 'http://localhost',
...baseRunnerOpts,
},
jsRunnerConfig: {
@@ -311,10 +311,10 @@ describe('JsTaskRunner', () => {
});
it("should not expose task runner's env variables even if no env state is received", async () => {
process.env.N8N_RUNNERS_N8N_URI = 'http://127.0.0.1:5679';
process.env.N8N_RUNNERS_TASK_BROKER_URI = 'http://127.0.0.1:5679';
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.N8N_RUNNERS_N8N_URI }',
code: 'return { val: $env.N8N_RUNNERS_TASK_BROKER_URI }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {

View File

@@ -0,0 +1,89 @@
import { WebSocket } from 'ws';
import { TaskRunner } from '@/task-runner';
class TestRunner extends TaskRunner {}
jest.mock('ws');
describe('TestRunner', () => {
describe('constructor', () => {
afterEach(() => {
jest.clearAllMocks();
});
it('should correctly construct WebSocket URI with provided taskBrokerUri', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
expect(WebSocket).toHaveBeenCalledWith(
`ws://localhost:8080/runners/_ws?id=${runner.id}`,
expect.objectContaining({
headers: {
authorization: 'Bearer test-token',
},
maxPayload: 1024,
}),
);
});
it('should handle different taskBrokerUri formats correctly', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'https://example.com:3000/path',
timezone: 'America/New_York',
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
expect(WebSocket).toHaveBeenCalledWith(
`ws://example.com:3000/runners/_ws?id=${runner.id}`,
expect.objectContaining({
headers: {
authorization: 'Bearer test-token',
},
maxPayload: 1024,
}),
);
});
it('should throw an error if taskBrokerUri is invalid', () => {
expect(
() =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'not-a-valid-uri',
timezone: 'America/New_York',
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
}),
).toThrowError(/Invalid URL/);
});
});
});

View File

@@ -92,7 +92,9 @@ export abstract class TaskRunner extends EventEmitter {
this.maxConcurrency = opts.maxConcurrency;
this.idleTimeout = opts.idleTimeout;
const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`;
const { host: taskBrokerHost } = new URL(opts.taskBrokerUri);
const wsUrl = `ws://${taskBrokerHost}/runners/_ws?id=${this.id}`;
this.ws = new WebSocket(wsUrl, {
headers: {
authorization: `Bearer ${opts.grantToken}`,
@@ -109,11 +111,11 @@ export abstract class TaskRunner extends EventEmitter {
['ECONNREFUSED', 'ENOTFOUND'].some((code) => code === error.code)
) {
console.error(
`Error: Failed to connect to n8n. Please ensure n8n is reachable at: ${opts.n8nUri}`,
`Error: Failed to connect to n8n task broker. Please ensure n8n task broker is reachable at: ${taskBrokerHost}`,
);
process.exit(1);
} else {
console.error(`Error: Failed to connect to n8n at ${opts.n8nUri}`);
console.error(`Error: Failed to connect to n8n task broker at ${taskBrokerHost}`);
console.error('Details:', event.message || 'Unknown error');
}
});

View File

@@ -95,19 +95,19 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
const grantToken = await this.authService.createGrantToken();
const n8nUri = `127.0.0.1:${this.runnerConfig.port}`;
this.process = this.startNode(grantToken, n8nUri);
const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`;
this.process = this.startNode(grantToken, taskBrokerUri);
forwardToLogger(this.logger, this.process, '[Task Runner]: ');
this.monitorProcess(this.process);
}
startNode(grantToken: string, n8nUri: string) {
startNode(grantToken: string, taskBrokerUri: string) {
const startScript = require.resolve('@n8n/task-runner/start');
return spawn('node', [startScript], {
env: this.getProcessEnvVars(grantToken, n8nUri),
env: this.getProcessEnvVars(grantToken, taskBrokerUri),
});
}
@@ -159,10 +159,10 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
}
}
private getProcessEnvVars(grantToken: string, n8nUri: string) {
private getProcessEnvVars(grantToken: string, taskBrokerUri: string) {
const envVars: Record<string, string> = {
N8N_RUNNERS_GRANT_TOKEN: grantToken,
N8N_RUNNERS_N8N_URI: n8nUri,
N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri,
N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(),
N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(),
...this.getPassthroughEnvVars(),