mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
feat: Initial Code Task Runners support (no-changelog) (#10698)
Co-authored-by: Iván Ovejero <ivov.src@gmail.com> Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
14
packages/@n8n/config/src/configs/runners.config.ts
Normal file
14
packages/@n8n/config/src/configs/runners.config.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { Config, Env } from '../decorators';
|
||||
|
||||
@Config
|
||||
export class TaskRunnersConfig {
|
||||
// Defaults to true for now
|
||||
@Env('N8N_RUNNERS_DISABLED')
|
||||
disabled: boolean = true;
|
||||
|
||||
@Env('N8N_RUNNERS_PATH')
|
||||
path: string = '/runners';
|
||||
|
||||
@Env('N8N_RUNNERS_AUTH_TOKEN')
|
||||
authToken: string = '';
|
||||
}
|
||||
@@ -8,6 +8,8 @@ import { ExternalStorageConfig } from './configs/external-storage.config';
|
||||
import { LoggingConfig } from './configs/logging.config';
|
||||
import { NodesConfig } from './configs/nodes.config';
|
||||
import { PublicApiConfig } from './configs/public-api.config';
|
||||
import { TaskRunnersConfig } from './configs/runners.config';
|
||||
export { TaskRunnersConfig } from './configs/runners.config';
|
||||
import { ScalingModeConfig } from './configs/scaling-mode.config';
|
||||
import { SentryConfig } from './configs/sentry.config';
|
||||
import { TemplatesConfig } from './configs/templates.config';
|
||||
@@ -85,4 +87,7 @@ export class GlobalConfig {
|
||||
|
||||
@Nested
|
||||
logging: LoggingConfig;
|
||||
|
||||
@Nested
|
||||
taskRunners: TaskRunnersConfig;
|
||||
}
|
||||
|
||||
@@ -221,6 +221,11 @@ describe('GlobalConfig', () => {
|
||||
},
|
||||
},
|
||||
},
|
||||
taskRunners: {
|
||||
disabled: true,
|
||||
path: '/runners',
|
||||
authToken: '',
|
||||
},
|
||||
sentry: {
|
||||
backendDsn: '',
|
||||
frontendDsn: '',
|
||||
|
||||
19
packages/@n8n/task-runner-node-js/.eslintrc.js
Normal file
19
packages/@n8n/task-runner-node-js/.eslintrc.js
Normal file
@@ -0,0 +1,19 @@
|
||||
const sharedOptions = require('@n8n_io/eslint-config/shared');
|
||||
|
||||
/**
|
||||
* @type {import('@types/eslint').ESLint.ConfigData}
|
||||
*/
|
||||
module.exports = {
|
||||
extends: ['@n8n_io/eslint-config/node'],
|
||||
|
||||
...sharedOptions(__dirname),
|
||||
|
||||
ignorePatterns: ['jest.config.js'],
|
||||
|
||||
rules: {
|
||||
'unicorn/filename-case': ['error', { case: 'kebabCase' }],
|
||||
'@typescript-eslint/no-duplicate-imports': 'off',
|
||||
|
||||
complexity: 'error',
|
||||
},
|
||||
};
|
||||
5
packages/@n8n/task-runner-node-js/jest.config.js
Normal file
5
packages/@n8n/task-runner-node-js/jest.config.js
Normal file
@@ -0,0 +1,5 @@
|
||||
/** @type {import('jest').Config} */
|
||||
module.exports = {
|
||||
...require('../../../jest.config'),
|
||||
testTimeout: 10_000,
|
||||
};
|
||||
59
packages/@n8n/task-runner-node-js/package.json
Normal file
59
packages/@n8n/task-runner-node-js/package.json
Normal file
@@ -0,0 +1,59 @@
|
||||
{
|
||||
"name": "@n8n/task-runner",
|
||||
"private": true,
|
||||
"version": "0.1.0",
|
||||
"description": "",
|
||||
"main": "dist/index.js",
|
||||
"scripts": {
|
||||
"start": "node dist/start.js",
|
||||
"dev": "pnpm build && pnpm start",
|
||||
"build": "tsc -p ./tsconfig.build.json",
|
||||
"test": "jest",
|
||||
"lint": "eslint .",
|
||||
"lintfix": "eslint . --fix",
|
||||
"watch": "tsc -w -p ./tsconfig.build.json"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=20.15",
|
||||
"pnpm": ">=9.5"
|
||||
},
|
||||
"files": [
|
||||
"src/",
|
||||
"dist/",
|
||||
"package.json",
|
||||
"tsconfig.json"
|
||||
],
|
||||
"main": "dist/index.js",
|
||||
"module": "src/index.ts",
|
||||
"types": "dist/index.d.ts",
|
||||
"packageManager": "pnpm@9.6.0",
|
||||
"devDependencies": {
|
||||
"@n8n_io/eslint-config": "^0.0.2",
|
||||
"@types/jest": "^29.5.0",
|
||||
"@types/node": "^18.13.0",
|
||||
"@types/ws": "^8.5.12",
|
||||
"@typescript-eslint/eslint-plugin": "^6.1.0",
|
||||
"eslint": "^8.38.0",
|
||||
"eslint-config-airbnb-typescript": "^17.1.0",
|
||||
"eslint-config-prettier": "^8.8.0",
|
||||
"eslint-plugin-n8n-local-rules": "^1.0.0",
|
||||
"eslint-plugin-prettier": "^5.0.0",
|
||||
"eslint-plugin-unicorn": "^48.0.0",
|
||||
"eslint-plugin-unused-imports": "^3.0.0",
|
||||
"jest": "^29.5.0",
|
||||
"nodemon": "^2.0.20",
|
||||
"prettier": "^3.0.0",
|
||||
"ts-jest": "^29.1.0",
|
||||
"ts-node": "^10.9.1",
|
||||
"tsc-alias": "^1.8.7",
|
||||
"typescript": "^5.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"jmespath": "^0.16.0",
|
||||
"luxon": "^3.5.0",
|
||||
"n8n-workflow": "workspace:*",
|
||||
"n8n-core": "workspace:*",
|
||||
"nanoid": "^3.3.6",
|
||||
"ws": "^8.18.0"
|
||||
}
|
||||
}
|
||||
42
packages/@n8n/task-runner-node-js/src/authenticator.ts
Normal file
42
packages/@n8n/task-runner-node-js/src/authenticator.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import * as a from 'node:assert/strict';
|
||||
|
||||
export type AuthOpts = {
|
||||
n8nUri: string;
|
||||
authToken: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Requests a one-time token that can be used to establish a task runner connection
|
||||
*/
|
||||
export async function authenticate(opts: AuthOpts) {
|
||||
try {
|
||||
const authEndpoint = `http://${opts.n8nUri}/rest/runners/auth`;
|
||||
const response = await fetch(authEndpoint, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
token: opts.authToken,
|
||||
}),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Invalid response status ${response.status}: ${await response.text()}`);
|
||||
}
|
||||
|
||||
const { data } = (await response.json()) as { data: { token: string } };
|
||||
const grantToken = data.token;
|
||||
a.ok(grantToken);
|
||||
|
||||
return grantToken;
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
const error = e as Error;
|
||||
|
||||
throw new Error(`Could not connect to n8n message broker ${opts.n8nUri}: ${error.message}`, {
|
||||
cause: error,
|
||||
});
|
||||
}
|
||||
}
|
||||
150
packages/@n8n/task-runner-node-js/src/code.ts
Normal file
150
packages/@n8n/task-runner-node-js/src/code.ts
Normal file
@@ -0,0 +1,150 @@
|
||||
import { runInNewContext, type Context } from 'node:vm';
|
||||
import * as a from 'node:assert';
|
||||
|
||||
import {
|
||||
type INode,
|
||||
type INodeType,
|
||||
type ITaskDataConnections,
|
||||
type IWorkflowExecuteAdditionalData,
|
||||
WorkflowDataProxy,
|
||||
type WorkflowParameters,
|
||||
} from 'n8n-workflow';
|
||||
import {
|
||||
type IDataObject,
|
||||
type IExecuteData,
|
||||
type INodeExecutionData,
|
||||
type INodeParameters,
|
||||
type IRunExecutionData,
|
||||
// type IWorkflowDataProxyAdditionalKeys,
|
||||
Workflow,
|
||||
type WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import { getAdditionalKeys } from 'n8n-core';
|
||||
|
||||
import type { TaskResultData } from './runner-types';
|
||||
import { type Task, TaskRunner } from './task-runner';
|
||||
|
||||
interface JSExecSettings {
|
||||
code: string;
|
||||
|
||||
// For workflow data proxy
|
||||
mode: WorkflowExecuteMode;
|
||||
}
|
||||
|
||||
export interface PartialAdditionalData {
|
||||
executionId?: string;
|
||||
restartExecutionId?: string;
|
||||
restApiUrl: string;
|
||||
instanceBaseUrl: string;
|
||||
formWaitingBaseUrl: string;
|
||||
webhookBaseUrl: string;
|
||||
webhookWaitingBaseUrl: string;
|
||||
webhookTestBaseUrl: string;
|
||||
currentNodeParameters?: INodeParameters;
|
||||
executionTimeoutTimestamp?: number;
|
||||
userId?: string;
|
||||
variables: IDataObject;
|
||||
}
|
||||
|
||||
export interface AllCodeTaskData {
|
||||
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
|
||||
inputData: ITaskDataConnections;
|
||||
node: INode;
|
||||
|
||||
runExecutionData: IRunExecutionData;
|
||||
runIndex: number;
|
||||
itemIndex: number;
|
||||
activeNodeName: string;
|
||||
connectionInputData: INodeExecutionData[];
|
||||
siblingParameters: INodeParameters;
|
||||
mode: WorkflowExecuteMode;
|
||||
executeData?: IExecuteData;
|
||||
defaultReturnRunIndex: number;
|
||||
selfData: IDataObject;
|
||||
contextNodeName: string;
|
||||
additionalData: PartialAdditionalData;
|
||||
}
|
||||
|
||||
export class JsTaskRunner extends TaskRunner {
|
||||
constructor(
|
||||
taskType: string,
|
||||
wsUrl: string,
|
||||
grantToken: string,
|
||||
maxConcurrency: number,
|
||||
name?: string,
|
||||
) {
|
||||
super(taskType, wsUrl, grantToken, maxConcurrency, name ?? 'JS Task Runner');
|
||||
}
|
||||
|
||||
async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
|
||||
const allData = await this.requestData<AllCodeTaskData>(task.taskId, 'all');
|
||||
|
||||
const settings = task.settings;
|
||||
a.ok(settings, 'JS Code not sent to runner');
|
||||
|
||||
const workflowParams = allData.workflow;
|
||||
const workflow = new Workflow({
|
||||
...workflowParams,
|
||||
nodeTypes: {
|
||||
getByNameAndVersion() {
|
||||
return undefined as unknown as INodeType;
|
||||
},
|
||||
getByName() {
|
||||
return undefined as unknown as INodeType;
|
||||
},
|
||||
getKnownTypes() {
|
||||
return {};
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const dataProxy = new WorkflowDataProxy(
|
||||
workflow,
|
||||
allData.runExecutionData,
|
||||
allData.runIndex,
|
||||
allData.itemIndex,
|
||||
allData.activeNodeName,
|
||||
allData.connectionInputData,
|
||||
allData.siblingParameters,
|
||||
settings.mode,
|
||||
getAdditionalKeys(
|
||||
allData.additionalData as IWorkflowExecuteAdditionalData,
|
||||
allData.mode,
|
||||
allData.runExecutionData,
|
||||
),
|
||||
allData.executeData,
|
||||
allData.defaultReturnRunIndex,
|
||||
allData.selfData,
|
||||
allData.contextNodeName,
|
||||
);
|
||||
|
||||
const customConsole = {
|
||||
log: (...args: unknown[]) => {
|
||||
const logOutput = args
|
||||
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
|
||||
.join(' ');
|
||||
console.log('[JS Code]', logOutput);
|
||||
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
|
||||
},
|
||||
};
|
||||
|
||||
const context: Context = {
|
||||
require,
|
||||
module: {},
|
||||
console: customConsole,
|
||||
|
||||
...dataProxy.getDataProxy(),
|
||||
...this.buildRpcCallObject(task.taskId),
|
||||
};
|
||||
|
||||
const result = (await runInNewContext(
|
||||
`module.exports = async function() {${settings.code}\n}()`,
|
||||
context,
|
||||
)) as TaskResultData['result'];
|
||||
|
||||
return {
|
||||
result,
|
||||
customData: allData.runExecutionData.resultData.metadata,
|
||||
};
|
||||
}
|
||||
}
|
||||
2
packages/@n8n/task-runner-node-js/src/index.ts
Normal file
2
packages/@n8n/task-runner-node-js/src/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from './task-runner';
|
||||
export * from './runner-types';
|
||||
231
packages/@n8n/task-runner-node-js/src/runner-types.ts
Normal file
231
packages/@n8n/task-runner-node-js/src/runner-types.ts
Normal file
@@ -0,0 +1,231 @@
|
||||
import type { INodeExecutionData } from 'n8n-workflow';
|
||||
|
||||
export type DataRequestType = 'input' | 'node' | 'all';
|
||||
|
||||
export interface TaskResultData {
|
||||
result: INodeExecutionData[];
|
||||
customData?: Record<string, string>;
|
||||
}
|
||||
|
||||
export namespace N8nMessage {
|
||||
export namespace ToRunner {
|
||||
export interface InfoRequest {
|
||||
type: 'broker:inforequest';
|
||||
}
|
||||
|
||||
export interface RunnerRegistered {
|
||||
type: 'broker:runnerregistered';
|
||||
}
|
||||
|
||||
export interface TaskOfferAccept {
|
||||
type: 'broker:taskofferaccept';
|
||||
taskId: string;
|
||||
offerId: string;
|
||||
}
|
||||
|
||||
export interface TaskCancel {
|
||||
type: 'broker:taskcancel';
|
||||
taskId: string;
|
||||
reason: string;
|
||||
}
|
||||
|
||||
export interface TaskSettings {
|
||||
type: 'broker:tasksettings';
|
||||
taskId: string;
|
||||
settings: unknown;
|
||||
}
|
||||
|
||||
export interface RPCResponse {
|
||||
type: 'broker:rpcresponse';
|
||||
callId: string;
|
||||
taskId: string;
|
||||
status: 'success' | 'error';
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface TaskDataResponse {
|
||||
type: 'broker:taskdataresponse';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export type All =
|
||||
| InfoRequest
|
||||
| TaskOfferAccept
|
||||
| TaskCancel
|
||||
| TaskSettings
|
||||
| RunnerRegistered
|
||||
| RPCResponse
|
||||
| TaskDataResponse;
|
||||
}
|
||||
|
||||
export namespace ToRequester {
|
||||
export interface TaskReady {
|
||||
type: 'broker:taskready';
|
||||
requestId: string;
|
||||
taskId: string;
|
||||
}
|
||||
|
||||
export interface TaskDone {
|
||||
type: 'broker:taskdone';
|
||||
taskId: string;
|
||||
data: TaskResultData;
|
||||
}
|
||||
|
||||
export interface TaskError {
|
||||
type: 'broker:taskerror';
|
||||
taskId: string;
|
||||
error: unknown;
|
||||
}
|
||||
|
||||
export interface TaskDataRequest {
|
||||
type: 'broker:taskdatarequest';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
requestType: DataRequestType;
|
||||
param?: string;
|
||||
}
|
||||
|
||||
export interface RPC {
|
||||
type: 'broker:rpc';
|
||||
callId: string;
|
||||
taskId: string;
|
||||
name: (typeof RPC_ALLOW_LIST)[number];
|
||||
params: unknown[];
|
||||
}
|
||||
|
||||
export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC;
|
||||
}
|
||||
}
|
||||
|
||||
export namespace RequesterMessage {
|
||||
export namespace ToN8n {
|
||||
export interface TaskSettings {
|
||||
type: 'requester:tasksettings';
|
||||
taskId: string;
|
||||
settings: unknown;
|
||||
}
|
||||
|
||||
export interface TaskCancel {
|
||||
type: 'requester:taskcancel';
|
||||
taskId: string;
|
||||
reason: string;
|
||||
}
|
||||
|
||||
export interface TaskDataResponse {
|
||||
type: 'requester:taskdataresponse';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface RPCResponse {
|
||||
type: 'requester:rpcresponse';
|
||||
taskId: string;
|
||||
callId: string;
|
||||
status: 'success' | 'error';
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface TaskRequest {
|
||||
type: 'requester:taskrequest';
|
||||
requestId: string;
|
||||
taskType: string;
|
||||
}
|
||||
|
||||
export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest;
|
||||
}
|
||||
}
|
||||
|
||||
export namespace RunnerMessage {
|
||||
export namespace ToN8n {
|
||||
export interface Info {
|
||||
type: 'runner:info';
|
||||
name: string;
|
||||
types: string[];
|
||||
}
|
||||
|
||||
export interface TaskAccepted {
|
||||
type: 'runner:taskaccepted';
|
||||
taskId: string;
|
||||
}
|
||||
|
||||
export interface TaskRejected {
|
||||
type: 'runner:taskrejected';
|
||||
taskId: string;
|
||||
reason: string;
|
||||
}
|
||||
|
||||
export interface TaskDone {
|
||||
type: 'runner:taskdone';
|
||||
taskId: string;
|
||||
data: TaskResultData;
|
||||
}
|
||||
|
||||
export interface TaskError {
|
||||
type: 'runner:taskerror';
|
||||
taskId: string;
|
||||
error: unknown;
|
||||
}
|
||||
|
||||
export interface TaskOffer {
|
||||
type: 'runner:taskoffer';
|
||||
offerId: string;
|
||||
taskType: string;
|
||||
validFor: number;
|
||||
}
|
||||
|
||||
export interface TaskDataRequest {
|
||||
type: 'runner:taskdatarequest';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
requestType: DataRequestType;
|
||||
param?: string;
|
||||
}
|
||||
|
||||
export interface RPC {
|
||||
type: 'runner:rpc';
|
||||
callId: string;
|
||||
taskId: string;
|
||||
name: (typeof RPC_ALLOW_LIST)[number];
|
||||
params: unknown[];
|
||||
}
|
||||
|
||||
export type All =
|
||||
| Info
|
||||
| TaskDone
|
||||
| TaskError
|
||||
| TaskAccepted
|
||||
| TaskRejected
|
||||
| TaskOffer
|
||||
| RPC
|
||||
| TaskDataRequest;
|
||||
}
|
||||
}
|
||||
|
||||
export const RPC_ALLOW_LIST = [
|
||||
'helpers.httpRequestWithAuthentication',
|
||||
'helpers.requestWithAuthenticationPaginated',
|
||||
// "helpers.normalizeItems"
|
||||
// "helpers.constructExecutionMetaData"
|
||||
// "helpers.assertBinaryData"
|
||||
'helpers.getBinaryDataBuffer',
|
||||
// "helpers.copyInputItems"
|
||||
// "helpers.returnJsonArray"
|
||||
'helpers.getSSHClient',
|
||||
'helpers.createReadStream',
|
||||
// "helpers.getStoragePath"
|
||||
'helpers.writeContentToFile',
|
||||
'helpers.prepareBinaryData',
|
||||
'helpers.setBinaryDataBuffer',
|
||||
'helpers.copyBinaryFile',
|
||||
'helpers.binaryToBuffer',
|
||||
// "helpers.binaryToString"
|
||||
// "helpers.getBinaryPath"
|
||||
'helpers.getBinaryStream',
|
||||
'helpers.getBinaryMetadata',
|
||||
'helpers.createDeferredPromise',
|
||||
'helpers.httpRequest',
|
||||
'logNodeOutput',
|
||||
] as const;
|
||||
34
packages/@n8n/task-runner-node-js/src/start.ts
Normal file
34
packages/@n8n/task-runner-node-js/src/start.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import * as a from 'node:assert/strict';
|
||||
|
||||
import { JsTaskRunner } from './code';
|
||||
import { authenticate } from './authenticator';
|
||||
|
||||
let _runner: JsTaskRunner;
|
||||
|
||||
type Config = {
|
||||
n8nUri: string;
|
||||
authToken: string;
|
||||
};
|
||||
|
||||
function readAndParseConfig(): Config {
|
||||
const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN;
|
||||
a.ok(authToken, 'Missing task runner auth token. Use N8N_RUNNERS_AUTH_TOKEN to configure it');
|
||||
|
||||
return {
|
||||
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? 'localhost:5678',
|
||||
authToken,
|
||||
};
|
||||
}
|
||||
|
||||
void (async function start() {
|
||||
const config = readAndParseConfig();
|
||||
|
||||
const grantToken = await authenticate({
|
||||
authToken: config.authToken,
|
||||
n8nUri: config.n8nUri,
|
||||
});
|
||||
|
||||
const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`;
|
||||
|
||||
_runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
|
||||
})();
|
||||
362
packages/@n8n/task-runner-node-js/src/task-runner.ts
Normal file
362
packages/@n8n/task-runner-node-js/src/task-runner.ts
Normal file
@@ -0,0 +1,362 @@
|
||||
import { URL } from 'node:url';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { type MessageEvent, WebSocket } from 'ws';
|
||||
import { ensureError } from 'n8n-workflow';
|
||||
|
||||
import {
|
||||
RPC_ALLOW_LIST,
|
||||
type RunnerMessage,
|
||||
type N8nMessage,
|
||||
type TaskResultData,
|
||||
} from './runner-types';
|
||||
|
||||
export interface Task<T = unknown> {
|
||||
taskId: string;
|
||||
settings?: T;
|
||||
active: boolean;
|
||||
cancelled: boolean;
|
||||
}
|
||||
|
||||
export interface TaskOffer {
|
||||
offerId: string;
|
||||
validUntil: bigint;
|
||||
}
|
||||
|
||||
interface DataRequest {
|
||||
requestId: string;
|
||||
resolve: (data: unknown) => void;
|
||||
reject: (error: unknown) => void;
|
||||
}
|
||||
|
||||
interface RPCCall {
|
||||
callId: string;
|
||||
resolve: (data: unknown) => void;
|
||||
reject: (error: unknown) => void;
|
||||
}
|
||||
|
||||
export interface RPCCallObject {
|
||||
[name: string]: ((...args: unknown[]) => Promise<unknown>) | RPCCallObject;
|
||||
}
|
||||
|
||||
const VALID_TIME_MS = 1000;
|
||||
const VALID_EXTRA_MS = 100;
|
||||
|
||||
export abstract class TaskRunner {
|
||||
id: string = nanoid();
|
||||
|
||||
ws: WebSocket;
|
||||
|
||||
canSendOffers = false;
|
||||
|
||||
runningTasks: Map<Task['taskId'], Task> = new Map();
|
||||
|
||||
offerInterval: NodeJS.Timeout | undefined;
|
||||
|
||||
openOffers: Map<TaskOffer['offerId'], TaskOffer> = new Map();
|
||||
|
||||
dataRequests: Map<DataRequest['requestId'], DataRequest> = new Map();
|
||||
|
||||
rpcCalls: Map<RPCCall['callId'], RPCCall> = new Map();
|
||||
|
||||
constructor(
|
||||
public taskType: string,
|
||||
wsUrl: string,
|
||||
grantToken: string,
|
||||
private maxConcurrency: number,
|
||||
public name?: string,
|
||||
) {
|
||||
const url = new URL(wsUrl);
|
||||
url.searchParams.append('id', this.id);
|
||||
this.ws = new WebSocket(url.toString(), {
|
||||
headers: {
|
||||
authorization: `Bearer ${grantToken}`,
|
||||
},
|
||||
});
|
||||
this.ws.addEventListener('message', this.receiveMessage);
|
||||
this.ws.addEventListener('close', this.stopTaskOffers);
|
||||
}
|
||||
|
||||
private receiveMessage = (message: MessageEvent) => {
|
||||
// eslint-disable-next-line n8n-local-rules/no-uncaught-json-parse
|
||||
const data = JSON.parse(message.data as string) as N8nMessage.ToRunner.All;
|
||||
void this.onMessage(data);
|
||||
};
|
||||
|
||||
private stopTaskOffers = () => {
|
||||
this.canSendOffers = false;
|
||||
if (this.offerInterval) {
|
||||
clearInterval(this.offerInterval);
|
||||
this.offerInterval = undefined;
|
||||
}
|
||||
};
|
||||
|
||||
private startTaskOffers() {
|
||||
this.canSendOffers = true;
|
||||
if (this.offerInterval) {
|
||||
clearInterval(this.offerInterval);
|
||||
}
|
||||
this.offerInterval = setInterval(() => this.sendOffers(), 250);
|
||||
}
|
||||
|
||||
deleteStaleOffers() {
|
||||
this.openOffers.forEach((offer, key) => {
|
||||
if (offer.validUntil < process.hrtime.bigint()) {
|
||||
this.openOffers.delete(key);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
sendOffers() {
|
||||
this.deleteStaleOffers();
|
||||
|
||||
const offersToSend =
|
||||
this.maxConcurrency -
|
||||
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
|
||||
|
||||
for (let i = 0; i < offersToSend; i++) {
|
||||
const offer: TaskOffer = {
|
||||
offerId: nanoid(),
|
||||
validUntil: process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency
|
||||
};
|
||||
this.openOffers.set(offer.offerId, offer);
|
||||
this.send({
|
||||
type: 'runner:taskoffer',
|
||||
taskType: this.taskType,
|
||||
offerId: offer.offerId,
|
||||
validFor: VALID_TIME_MS,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
send(message: RunnerMessage.ToN8n.All) {
|
||||
this.ws.send(JSON.stringify(message));
|
||||
}
|
||||
|
||||
onMessage(message: N8nMessage.ToRunner.All) {
|
||||
switch (message.type) {
|
||||
case 'broker:inforequest':
|
||||
this.send({
|
||||
type: 'runner:info',
|
||||
name: this.name ?? 'Node.js Task Runner SDK',
|
||||
types: [this.taskType],
|
||||
});
|
||||
break;
|
||||
case 'broker:runnerregistered':
|
||||
this.startTaskOffers();
|
||||
break;
|
||||
case 'broker:taskofferaccept':
|
||||
this.offerAccepted(message.offerId, message.taskId);
|
||||
break;
|
||||
case 'broker:taskcancel':
|
||||
this.taskCancelled(message.taskId);
|
||||
break;
|
||||
case 'broker:tasksettings':
|
||||
void this.receivedSettings(message.taskId, message.settings);
|
||||
break;
|
||||
case 'broker:taskdataresponse':
|
||||
this.processDataResponse(message.requestId, message.data);
|
||||
break;
|
||||
case 'broker:rpcresponse':
|
||||
this.handleRpcResponse(message.callId, message.status, message.data);
|
||||
}
|
||||
}
|
||||
|
||||
processDataResponse(requestId: string, data: unknown) {
|
||||
const request = this.dataRequests.get(requestId);
|
||||
if (!request) {
|
||||
return;
|
||||
}
|
||||
// Deleting of the request is handled in `requestData`, using a
|
||||
// `finally` wrapped around the return
|
||||
request.resolve(data);
|
||||
}
|
||||
|
||||
hasOpenTasks() {
|
||||
return Object.values(this.runningTasks).length < this.maxConcurrency;
|
||||
}
|
||||
|
||||
offerAccepted(offerId: string, taskId: string) {
|
||||
if (!this.hasOpenTasks()) {
|
||||
this.send({
|
||||
type: 'runner:taskrejected',
|
||||
taskId,
|
||||
reason: 'No open task slots',
|
||||
});
|
||||
return;
|
||||
}
|
||||
const offer = this.openOffers.get(offerId);
|
||||
if (!offer) {
|
||||
this.send({
|
||||
type: 'runner:taskrejected',
|
||||
taskId,
|
||||
reason: 'Offer expired and no open task slots',
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
this.openOffers.delete(offerId);
|
||||
}
|
||||
|
||||
this.runningTasks.set(taskId, {
|
||||
taskId,
|
||||
active: false,
|
||||
cancelled: false,
|
||||
});
|
||||
|
||||
this.send({
|
||||
type: 'runner:taskaccepted',
|
||||
taskId,
|
||||
});
|
||||
}
|
||||
|
||||
taskCancelled(taskId: string) {
|
||||
const task = this.runningTasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
task.cancelled = true;
|
||||
if (task.active) {
|
||||
// TODO
|
||||
} else {
|
||||
this.runningTasks.delete(taskId);
|
||||
}
|
||||
this.sendOffers();
|
||||
}
|
||||
|
||||
taskErrored(taskId: string, error: unknown) {
|
||||
this.send({
|
||||
type: 'runner:taskerror',
|
||||
taskId,
|
||||
error,
|
||||
});
|
||||
this.runningTasks.delete(taskId);
|
||||
this.sendOffers();
|
||||
}
|
||||
|
||||
taskDone(taskId: string, data: RunnerMessage.ToN8n.TaskDone['data']) {
|
||||
this.send({
|
||||
type: 'runner:taskdone',
|
||||
taskId,
|
||||
data,
|
||||
});
|
||||
this.runningTasks.delete(taskId);
|
||||
this.sendOffers();
|
||||
}
|
||||
|
||||
async receivedSettings(taskId: string, settings: unknown) {
|
||||
const task = this.runningTasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
if (task.cancelled) {
|
||||
this.runningTasks.delete(taskId);
|
||||
return;
|
||||
}
|
||||
task.settings = settings;
|
||||
task.active = true;
|
||||
try {
|
||||
const data = await this.executeTask(task);
|
||||
this.taskDone(taskId, data);
|
||||
} catch (e) {
|
||||
if (ensureError(e)) {
|
||||
this.taskErrored(taskId, (e as Error).message);
|
||||
} else {
|
||||
this.taskErrored(taskId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
async executeTask(_task: Task): Promise<TaskResultData> {
|
||||
throw new Error('Unimplemented');
|
||||
}
|
||||
|
||||
async requestData<T = unknown>(
|
||||
taskId: Task['taskId'],
|
||||
type: RunnerMessage.ToN8n.TaskDataRequest['requestType'],
|
||||
param?: string,
|
||||
): Promise<T> {
|
||||
const requestId = nanoid();
|
||||
|
||||
const p = new Promise<T>((resolve, reject) => {
|
||||
this.dataRequests.set(requestId, {
|
||||
requestId,
|
||||
resolve: resolve as (data: unknown) => void,
|
||||
reject,
|
||||
});
|
||||
});
|
||||
|
||||
this.send({
|
||||
type: 'runner:taskdatarequest',
|
||||
taskId,
|
||||
requestId,
|
||||
requestType: type,
|
||||
param,
|
||||
});
|
||||
|
||||
try {
|
||||
return await p;
|
||||
} finally {
|
||||
this.dataRequests.delete(requestId);
|
||||
}
|
||||
}
|
||||
|
||||
async makeRpcCall(taskId: string, name: RunnerMessage.ToN8n.RPC['name'], params: unknown[]) {
|
||||
const callId = nanoid();
|
||||
|
||||
const dataPromise = new Promise((resolve, reject) => {
|
||||
this.rpcCalls.set(callId, {
|
||||
callId,
|
||||
resolve,
|
||||
reject,
|
||||
});
|
||||
});
|
||||
|
||||
this.send({
|
||||
type: 'runner:rpc',
|
||||
callId,
|
||||
taskId,
|
||||
name,
|
||||
params,
|
||||
});
|
||||
|
||||
try {
|
||||
return await dataPromise;
|
||||
} finally {
|
||||
this.rpcCalls.delete(callId);
|
||||
}
|
||||
}
|
||||
|
||||
handleRpcResponse(
|
||||
callId: string,
|
||||
status: N8nMessage.ToRunner.RPCResponse['status'],
|
||||
data: unknown,
|
||||
) {
|
||||
const call = this.rpcCalls.get(callId);
|
||||
if (!call) {
|
||||
return;
|
||||
}
|
||||
if (status === 'success') {
|
||||
call.resolve(data);
|
||||
} else {
|
||||
call.reject(typeof data === 'string' ? new Error(data) : data);
|
||||
}
|
||||
}
|
||||
|
||||
buildRpcCallObject(taskId: string) {
|
||||
const rpcObject: RPCCallObject = {};
|
||||
for (const r of RPC_ALLOW_LIST) {
|
||||
const splitPath = r.split('.');
|
||||
let obj = rpcObject;
|
||||
|
||||
splitPath.forEach((s, index) => {
|
||||
if (index !== splitPath.length - 1) {
|
||||
obj[s] = {};
|
||||
obj = obj[s];
|
||||
return;
|
||||
}
|
||||
obj[s] = async (...args: unknown[]) => this.makeRpcCall(taskId, r, args);
|
||||
});
|
||||
}
|
||||
return rpcObject;
|
||||
}
|
||||
}
|
||||
10
packages/@n8n/task-runner-node-js/tsconfig.build.json
Normal file
10
packages/@n8n/task-runner-node-js/tsconfig.build.json
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"extends": ["./tsconfig.json", "../../../tsconfig.build.json"],
|
||||
"compilerOptions": {
|
||||
"rootDir": "src",
|
||||
"outDir": "dist",
|
||||
"tsBuildInfoFile": "dist/build.tsbuildinfo"
|
||||
},
|
||||
"include": ["src/**/*.ts"],
|
||||
"exclude": ["test/**", "src/**/__tests__/**"]
|
||||
}
|
||||
14
packages/@n8n/task-runner-node-js/tsconfig.json
Normal file
14
packages/@n8n/task-runner-node-js/tsconfig.json
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"extends": ["../../../tsconfig.json", "../../../tsconfig.backend.json"],
|
||||
"compilerOptions": {
|
||||
"rootDir": ".",
|
||||
"emitDecoratorMetadata": true,
|
||||
"experimentalDecorators": true,
|
||||
"baseUrl": "src",
|
||||
"paths": {
|
||||
"@/*": ["./*"]
|
||||
},
|
||||
"tsBuildInfoFile": "dist/typecheck.tsbuildinfo"
|
||||
},
|
||||
"include": ["src/**/*.ts", "test/**/*.ts"]
|
||||
}
|
||||
@@ -92,6 +92,7 @@
|
||||
"@n8n/localtunnel": "3.0.0",
|
||||
"@n8n/n8n-nodes-langchain": "workspace:*",
|
||||
"@n8n/permissions": "workspace:*",
|
||||
"@n8n/task-runner": "workspace:*",
|
||||
"@n8n/typeorm": "0.3.20-12",
|
||||
"@n8n_io/ai-assistant-sdk": "1.9.4",
|
||||
"@n8n_io/license-sdk": "2.13.1",
|
||||
|
||||
@@ -119,6 +119,8 @@ export abstract class AbstractServer {
|
||||
|
||||
protected setupPushServer() {}
|
||||
|
||||
protected setupRunnerServer() {}
|
||||
|
||||
private async setupHealthCheck() {
|
||||
// main health check should not care about DB connections
|
||||
this.app.get('/healthz', async (_req, res) => {
|
||||
@@ -182,6 +184,10 @@ export abstract class AbstractServer {
|
||||
if (!inTest) {
|
||||
await this.setupErrorHandlers();
|
||||
this.setupPushServer();
|
||||
|
||||
if (!this.globalConfig.taskRunners.disabled) {
|
||||
this.setupRunnerServer();
|
||||
}
|
||||
}
|
||||
|
||||
this.setupCommonMiddlewares();
|
||||
|
||||
@@ -21,6 +21,8 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
|
||||
import { EventService } from '@/events/event.service';
|
||||
import { ExecutionService } from '@/executions/execution.service';
|
||||
import { License } from '@/license';
|
||||
import { SingleMainTaskManager } from '@/runners/task-managers/single-main-task-manager';
|
||||
import { TaskManager } from '@/runners/task-managers/task-manager';
|
||||
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
||||
import { Server } from '@/server';
|
||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
||||
@@ -220,6 +222,10 @@ export class Start extends BaseCommand {
|
||||
if (!this.globalConfig.endpoints.disableUi) {
|
||||
await this.generateStaticAssets();
|
||||
}
|
||||
|
||||
if (!this.globalConfig.taskRunners.disabled) {
|
||||
Container.set(TaskManager, new SingleMainTaskManager());
|
||||
}
|
||||
}
|
||||
|
||||
async initOrchestration() {
|
||||
|
||||
504
packages/cli/src/runners/__tests__/task-broker.test.ts
Normal file
504
packages/cli/src/runners/__tests__/task-broker.test.ts
Normal file
@@ -0,0 +1,504 @@
|
||||
import { mock } from 'jest-mock-extended';
|
||||
|
||||
import { TaskRejectError } from '../errors';
|
||||
import type { RunnerMessage, TaskResultData } from '../runner-types';
|
||||
import { TaskBroker } from '../task-broker.service';
|
||||
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
|
||||
|
||||
describe('TaskBroker', () => {
|
||||
let taskBroker: TaskBroker;
|
||||
|
||||
beforeEach(() => {
|
||||
taskBroker = new TaskBroker(mock());
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('expireTasks', () => {
|
||||
it('should remove expired task offers and keep valid task offers', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const validOffer: TaskOffer = {
|
||||
offerId: 'valid',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future
|
||||
};
|
||||
|
||||
const expiredOffer1: TaskOffer = {
|
||||
offerId: 'expired1',
|
||||
runnerId: 'runner2',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past
|
||||
};
|
||||
|
||||
const expiredOffer2: TaskOffer = {
|
||||
offerId: 'expired2',
|
||||
runnerId: 'runner3',
|
||||
taskType: 'taskType1',
|
||||
validFor: 2000,
|
||||
validUntil: now - BigInt(2000 * 1_000_000), // 2 seconds in the past
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([validOffer, expiredOffer1, expiredOffer2]);
|
||||
|
||||
taskBroker.expireTasks();
|
||||
|
||||
const offers = taskBroker.getPendingTaskOffers();
|
||||
|
||||
expect(offers).toHaveLength(1);
|
||||
expect(offers[0]).toEqual(validOffer);
|
||||
});
|
||||
});
|
||||
|
||||
describe('registerRunner', () => {
|
||||
it('should add a runner to known runners', () => {
|
||||
const runnerId = 'runner1';
|
||||
const runner = mock<TaskRunner>({ id: runnerId });
|
||||
const messageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(runner, messageCallback);
|
||||
|
||||
const knownRunners = taskBroker.getKnownRunners();
|
||||
const runnerIds = [...knownRunners.keys()];
|
||||
|
||||
expect(runnerIds).toHaveLength(1);
|
||||
expect(runnerIds[0]).toEqual(runnerId);
|
||||
|
||||
expect(knownRunners.get(runnerId)?.runner).toEqual(runner);
|
||||
expect(knownRunners.get(runnerId)?.messageCallback).toEqual(messageCallback);
|
||||
});
|
||||
});
|
||||
|
||||
describe('registerRequester', () => {
|
||||
it('should add a requester to known requesters', () => {
|
||||
const requesterId = 'requester1';
|
||||
const messageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRequester(requesterId, messageCallback);
|
||||
|
||||
const knownRequesters = taskBroker.getKnownRequesters();
|
||||
const requesterIds = [...knownRequesters.keys()];
|
||||
|
||||
expect(requesterIds).toHaveLength(1);
|
||||
expect(requesterIds[0]).toEqual(requesterId);
|
||||
|
||||
expect(knownRequesters.get(requesterId)).toEqual(messageCallback);
|
||||
});
|
||||
});
|
||||
|
||||
describe('deregisterRunner', () => {
|
||||
it('should remove a runner from known runners', () => {
|
||||
const runnerId = 'runner1';
|
||||
const runner = mock<TaskRunner>({ id: runnerId });
|
||||
const messageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(runner, messageCallback);
|
||||
taskBroker.deregisterRunner(runnerId);
|
||||
|
||||
const knownRunners = taskBroker.getKnownRunners();
|
||||
const runnerIds = Object.keys(knownRunners);
|
||||
|
||||
expect(runnerIds).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('deregisterRequester', () => {
|
||||
it('should remove a requester from known requesters', () => {
|
||||
const requesterId = 'requester1';
|
||||
const messageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRequester(requesterId, messageCallback);
|
||||
taskBroker.deregisterRequester(requesterId);
|
||||
|
||||
const knownRequesters = taskBroker.getKnownRequesters();
|
||||
const requesterIds = Object.keys(knownRequesters);
|
||||
|
||||
expect(requesterIds).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('taskRequested', () => {
|
||||
it('should match a pending offer to an incoming request', async () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const offer: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([offer]);
|
||||
|
||||
const request: TaskRequest = {
|
||||
requestId: 'request1',
|
||||
requesterId: 'requester1',
|
||||
taskType: 'taskType1',
|
||||
};
|
||||
|
||||
jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue(); // allow Jest to exit cleanly
|
||||
|
||||
taskBroker.taskRequested(request);
|
||||
|
||||
expect(taskBroker.acceptOffer).toHaveBeenCalled();
|
||||
expect(taskBroker.getPendingTaskOffers()).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('taskOffered', () => {
|
||||
it('should match a pending request to an incoming offer', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const request: TaskRequest = {
|
||||
requestId: 'request1',
|
||||
requesterId: 'requester1',
|
||||
taskType: 'taskType1',
|
||||
acceptInProgress: false,
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskRequests([request]);
|
||||
|
||||
const offer: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
};
|
||||
|
||||
jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue(); // allow Jest to exit cleanly
|
||||
|
||||
taskBroker.taskOffered(offer);
|
||||
|
||||
expect(taskBroker.acceptOffer).toHaveBeenCalled();
|
||||
expect(taskBroker.getPendingTaskOffers()).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('settleTasks', () => {
|
||||
it('should match task offers with task requests by task type', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const offer1: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
};
|
||||
|
||||
const offer2: TaskOffer = {
|
||||
offerId: 'offer2',
|
||||
runnerId: 'runner2',
|
||||
taskType: 'taskType2',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
};
|
||||
|
||||
const request1: TaskRequest = {
|
||||
requestId: 'request1',
|
||||
requesterId: 'requester1',
|
||||
taskType: 'taskType1',
|
||||
acceptInProgress: false,
|
||||
};
|
||||
|
||||
const request2: TaskRequest = {
|
||||
requestId: 'request2',
|
||||
requesterId: 'requester2',
|
||||
taskType: 'taskType2',
|
||||
acceptInProgress: false,
|
||||
};
|
||||
|
||||
const request3: TaskRequest = {
|
||||
requestId: 'request3',
|
||||
requesterId: 'requester3',
|
||||
taskType: 'taskType3', // will have no match
|
||||
acceptInProgress: false,
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([offer1, offer2]);
|
||||
taskBroker.setPendingTaskRequests([request1, request2, request3]);
|
||||
|
||||
const acceptOfferSpy = jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue();
|
||||
|
||||
taskBroker.settleTasks();
|
||||
|
||||
expect(acceptOfferSpy).toHaveBeenCalledTimes(2);
|
||||
expect(acceptOfferSpy).toHaveBeenCalledWith(offer1, request1);
|
||||
expect(acceptOfferSpy).toHaveBeenCalledWith(offer2, request2);
|
||||
|
||||
const remainingOffers = taskBroker.getPendingTaskOffers();
|
||||
expect(remainingOffers).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('should not match a request whose acceptance is in progress', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const offer: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
};
|
||||
|
||||
const request: TaskRequest = {
|
||||
requestId: 'request1',
|
||||
requesterId: 'requester1',
|
||||
taskType: 'taskType1',
|
||||
acceptInProgress: true,
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([offer]);
|
||||
taskBroker.setPendingTaskRequests([request]);
|
||||
|
||||
const acceptOfferSpy = jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue();
|
||||
|
||||
taskBroker.settleTasks();
|
||||
|
||||
expect(acceptOfferSpy).not.toHaveBeenCalled();
|
||||
|
||||
const remainingOffers = taskBroker.getPendingTaskOffers();
|
||||
expect(remainingOffers).toHaveLength(1);
|
||||
expect(remainingOffers[0]).toEqual(offer);
|
||||
|
||||
const remainingRequests = taskBroker.getPendingTaskRequests();
|
||||
expect(remainingRequests).toHaveLength(1);
|
||||
expect(remainingRequests[0]).toEqual(request);
|
||||
});
|
||||
|
||||
it('should expire tasks before settling', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const validOffer: TaskOffer = {
|
||||
offerId: 'valid',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future
|
||||
};
|
||||
|
||||
const expiredOffer: TaskOffer = {
|
||||
offerId: 'expired',
|
||||
runnerId: 'runner2',
|
||||
taskType: 'taskType2', // will be removed before matching
|
||||
validFor: 1000,
|
||||
validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past
|
||||
};
|
||||
|
||||
const request1: TaskRequest = {
|
||||
requestId: 'request1',
|
||||
requesterId: 'requester1',
|
||||
taskType: 'taskType1',
|
||||
acceptInProgress: false,
|
||||
};
|
||||
|
||||
const request2: TaskRequest = {
|
||||
requestId: 'request2',
|
||||
requesterId: 'requester2',
|
||||
taskType: 'taskType2',
|
||||
acceptInProgress: false,
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([validOffer, expiredOffer]);
|
||||
taskBroker.setPendingTaskRequests([request1, request2]);
|
||||
|
||||
const acceptOfferSpy = jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue();
|
||||
|
||||
taskBroker.settleTasks();
|
||||
|
||||
expect(acceptOfferSpy).toHaveBeenCalledTimes(1);
|
||||
expect(acceptOfferSpy).toHaveBeenCalledWith(validOffer, request1);
|
||||
|
||||
const remainingOffers = taskBroker.getPendingTaskOffers();
|
||||
expect(remainingOffers).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('onRunnerMessage', () => {
|
||||
it('should handle `runner:taskaccepted` message', async () => {
|
||||
const runnerId = 'runner1';
|
||||
const taskId = 'task1';
|
||||
|
||||
const message: RunnerMessage.ToN8n.TaskAccepted = {
|
||||
type: 'runner:taskaccepted',
|
||||
taskId,
|
||||
};
|
||||
|
||||
const accept = jest.fn();
|
||||
const reject = jest.fn();
|
||||
|
||||
taskBroker.setRunnerAcceptRejects({ [taskId]: { accept, reject } });
|
||||
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
|
||||
|
||||
await taskBroker.onRunnerMessage(runnerId, message);
|
||||
|
||||
const runnerAcceptRejects = taskBroker.getRunnerAcceptRejects();
|
||||
|
||||
expect(accept).toHaveBeenCalled();
|
||||
expect(reject).not.toHaveBeenCalled();
|
||||
expect(runnerAcceptRejects.get(taskId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle `runner:taskrejected` message', async () => {
|
||||
const runnerId = 'runner1';
|
||||
const taskId = 'task1';
|
||||
const rejectionReason = 'Task execution failed';
|
||||
|
||||
const message: RunnerMessage.ToN8n.TaskRejected = {
|
||||
type: 'runner:taskrejected',
|
||||
taskId,
|
||||
reason: rejectionReason,
|
||||
};
|
||||
|
||||
const accept = jest.fn();
|
||||
const reject = jest.fn();
|
||||
|
||||
taskBroker.setRunnerAcceptRejects({ [taskId]: { accept, reject } });
|
||||
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
|
||||
|
||||
await taskBroker.onRunnerMessage(runnerId, message);
|
||||
|
||||
const runnerAcceptRejects = taskBroker.getRunnerAcceptRejects();
|
||||
|
||||
expect(accept).not.toHaveBeenCalled();
|
||||
expect(reject).toHaveBeenCalledWith(new TaskRejectError(rejectionReason));
|
||||
expect(runnerAcceptRejects.get(taskId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle `runner:taskdone` message', async () => {
|
||||
const runnerId = 'runner1';
|
||||
const taskId = 'task1';
|
||||
const requesterId = 'requester1';
|
||||
const data = mock<TaskResultData>();
|
||||
|
||||
const message: RunnerMessage.ToN8n.TaskDone = {
|
||||
type: 'runner:taskdone',
|
||||
taskId,
|
||||
data,
|
||||
};
|
||||
|
||||
const requesterMessageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
|
||||
});
|
||||
taskBroker.registerRequester(requesterId, requesterMessageCallback);
|
||||
|
||||
await taskBroker.onRunnerMessage(runnerId, message);
|
||||
|
||||
expect(requesterMessageCallback).toHaveBeenCalledWith({
|
||||
type: 'broker:taskdone',
|
||||
taskId,
|
||||
data,
|
||||
});
|
||||
|
||||
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle `runner:taskerror` message', async () => {
|
||||
const runnerId = 'runner1';
|
||||
const taskId = 'task1';
|
||||
const requesterId = 'requester1';
|
||||
const errorMessage = 'Task execution failed';
|
||||
|
||||
const message: RunnerMessage.ToN8n.TaskError = {
|
||||
type: 'runner:taskerror',
|
||||
taskId,
|
||||
error: errorMessage,
|
||||
};
|
||||
|
||||
const requesterMessageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
|
||||
});
|
||||
taskBroker.registerRequester(requesterId, requesterMessageCallback);
|
||||
|
||||
await taskBroker.onRunnerMessage(runnerId, message);
|
||||
|
||||
expect(requesterMessageCallback).toHaveBeenCalledWith({
|
||||
type: 'broker:taskerror',
|
||||
taskId,
|
||||
error: errorMessage,
|
||||
});
|
||||
|
||||
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle `runner:taskdatarequest` message', async () => {
|
||||
const runnerId = 'runner1';
|
||||
const taskId = 'task1';
|
||||
const requesterId = 'requester1';
|
||||
const requestId = 'request1';
|
||||
const requestType = 'input';
|
||||
const param = 'test_param';
|
||||
|
||||
const message: RunnerMessage.ToN8n.TaskDataRequest = {
|
||||
type: 'runner:taskdatarequest',
|
||||
taskId,
|
||||
requestId,
|
||||
requestType,
|
||||
param,
|
||||
};
|
||||
|
||||
const requesterMessageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
|
||||
});
|
||||
taskBroker.registerRequester(requesterId, requesterMessageCallback);
|
||||
|
||||
await taskBroker.onRunnerMessage(runnerId, message);
|
||||
|
||||
expect(requesterMessageCallback).toHaveBeenCalledWith({
|
||||
type: 'broker:taskdatarequest',
|
||||
taskId,
|
||||
requestId,
|
||||
requestType,
|
||||
param,
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle `runner:rpc` message', async () => {
|
||||
const runnerId = 'runner1';
|
||||
const taskId = 'task1';
|
||||
const requesterId = 'requester1';
|
||||
const callId = 'call1';
|
||||
const rpcName = 'helpers.httpRequestWithAuthentication';
|
||||
const rpcParams = ['param1', 'param2'];
|
||||
|
||||
const message: RunnerMessage.ToN8n.RPC = {
|
||||
type: 'runner:rpc',
|
||||
taskId,
|
||||
callId,
|
||||
name: rpcName,
|
||||
params: rpcParams,
|
||||
};
|
||||
|
||||
const requesterMessageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
|
||||
});
|
||||
taskBroker.registerRequester(requesterId, requesterMessageCallback);
|
||||
|
||||
await taskBroker.onRunnerMessage(runnerId, message);
|
||||
|
||||
expect(requesterMessageCallback).toHaveBeenCalledWith({
|
||||
type: 'broker:rpc',
|
||||
taskId,
|
||||
callId,
|
||||
name: rpcName,
|
||||
params: rpcParams,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,115 @@
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import type { NextFunction, Response } from 'express';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
|
||||
import { CacheService } from '@/services/cache/cache.service';
|
||||
import { mockInstance } from '@test/mocking';
|
||||
|
||||
import { BadRequestError } from '../../../errors/response-errors/bad-request.error';
|
||||
import { ForbiddenError } from '../../../errors/response-errors/forbidden.error';
|
||||
import type { AuthlessRequest } from '../../../requests';
|
||||
import type { TaskRunnerServerInitRequest } from '../../runner-types';
|
||||
import { TaskRunnerAuthController } from '../task-runner-auth.controller';
|
||||
import { TaskRunnerAuthService } from '../task-runner-auth.service';
|
||||
|
||||
describe('TaskRunnerAuthController', () => {
|
||||
const globalConfig = mockInstance(GlobalConfig, {
|
||||
cache: {
|
||||
backend: 'memory',
|
||||
memory: {
|
||||
maxSize: 1024,
|
||||
ttl: 9999,
|
||||
},
|
||||
},
|
||||
taskRunners: {
|
||||
authToken: 'random-secret',
|
||||
},
|
||||
});
|
||||
const TTL = 100;
|
||||
const cacheService = new CacheService(globalConfig);
|
||||
const authService = new TaskRunnerAuthService(globalConfig, cacheService, TTL);
|
||||
const authController = new TaskRunnerAuthController(authService);
|
||||
|
||||
const createMockGrantTokenReq = (token?: string) =>
|
||||
({
|
||||
body: {
|
||||
token,
|
||||
},
|
||||
}) as unknown as AuthlessRequest;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('createGrantToken', () => {
|
||||
it('should throw BadRequestError when auth token is missing', async () => {
|
||||
const req = createMockGrantTokenReq();
|
||||
|
||||
// Act
|
||||
await expect(authController.createGrantToken(req)).rejects.toThrowError(BadRequestError);
|
||||
});
|
||||
|
||||
it('should throw ForbiddenError when auth token is invalid', async () => {
|
||||
const req = createMockGrantTokenReq('invalid');
|
||||
|
||||
// Act
|
||||
await expect(authController.createGrantToken(req)).rejects.toThrowError(ForbiddenError);
|
||||
});
|
||||
|
||||
it('should return rant token when auth token is valid', async () => {
|
||||
const req = createMockGrantTokenReq('random-secret');
|
||||
|
||||
// Act
|
||||
await expect(authController.createGrantToken(req)).resolves.toStrictEqual({
|
||||
token: expect.any(String),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('authMiddleware', () => {
|
||||
const res = mock<Response>();
|
||||
const next = jest.fn() as NextFunction;
|
||||
|
||||
const createMockReqWithToken = (token?: string) =>
|
||||
mock<TaskRunnerServerInitRequest>({
|
||||
headers: {
|
||||
authorization: `Bearer ${token}`,
|
||||
},
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
res.status.mockReturnThis();
|
||||
});
|
||||
|
||||
it('should respond with 401 when grant token is missing', async () => {
|
||||
const req = mock<TaskRunnerServerInitRequest>({});
|
||||
|
||||
await authController.authMiddleware(req, res, next);
|
||||
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
expect(res.status).toHaveBeenCalledWith(401);
|
||||
expect(res.json).toHaveBeenCalledWith({ code: 401, message: 'Unauthorized' });
|
||||
});
|
||||
|
||||
it('should respond with 403 when grant token is invalid', async () => {
|
||||
const req = createMockReqWithToken('invalid');
|
||||
|
||||
await authController.authMiddleware(req, res, next);
|
||||
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
expect(res.status).toHaveBeenCalledWith(403);
|
||||
expect(res.json).toHaveBeenCalledWith({ code: 403, message: 'Forbidden' });
|
||||
});
|
||||
|
||||
it('should call next() when grant token is valid', async () => {
|
||||
const { token: validToken } = await authController.createGrantToken(
|
||||
createMockGrantTokenReq('random-secret'),
|
||||
);
|
||||
|
||||
await authController.authMiddleware(createMockReqWithToken(validToken), res, next);
|
||||
|
||||
expect(next).toHaveBeenCalled();
|
||||
expect(res.status).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,92 @@
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { sleep } from 'n8n-workflow';
|
||||
|
||||
import config from '@/config';
|
||||
import { CacheService } from '@/services/cache/cache.service';
|
||||
|
||||
import { mockInstance } from '../../../../test/shared/mocking';
|
||||
import { TaskRunnerAuthService } from '../task-runner-auth.service';
|
||||
|
||||
describe('TaskRunnerAuthService', () => {
|
||||
config.set('taskRunners.authToken', 'random-secret');
|
||||
|
||||
const globalConfig = mockInstance(GlobalConfig, {
|
||||
cache: {
|
||||
backend: 'memory',
|
||||
memory: {
|
||||
maxSize: 1024,
|
||||
ttl: 9999,
|
||||
},
|
||||
},
|
||||
taskRunners: {
|
||||
authToken: 'random-secret',
|
||||
},
|
||||
});
|
||||
const TTL = 100;
|
||||
const cacheService = new CacheService(globalConfig);
|
||||
const authService = new TaskRunnerAuthService(globalConfig, cacheService, TTL);
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('isValidAuthToken', () => {
|
||||
it('should be valid for the configured token', () => {
|
||||
expect(authService.isValidAuthToken('random-secret'));
|
||||
});
|
||||
|
||||
it('should be invalid for anything else', () => {
|
||||
expect(authService.isValidAuthToken('!random-secret'));
|
||||
});
|
||||
});
|
||||
|
||||
describe('createGrantToken', () => {
|
||||
it('should generate a random token', async () => {
|
||||
expect(typeof (await authService.createGrantToken())).toBe('string');
|
||||
});
|
||||
|
||||
it('should store the generated token in cache', async () => {
|
||||
// Arrange
|
||||
const cacheSetSpy = jest.spyOn(cacheService, 'set');
|
||||
|
||||
// Act
|
||||
const token = await authService.createGrantToken();
|
||||
|
||||
// Assert
|
||||
expect(cacheSetSpy).toHaveBeenCalledWith(`grant-token:${token}`, '1', TTL);
|
||||
});
|
||||
});
|
||||
|
||||
describe('tryConsumeGrantToken', () => {
|
||||
it('should return false for an invalid grant token', async () => {
|
||||
expect(await authService.tryConsumeGrantToken('random-secret')).toBe(false);
|
||||
});
|
||||
|
||||
it('should return true for a valid grant token', async () => {
|
||||
// Arrange
|
||||
const grantToken = await authService.createGrantToken();
|
||||
|
||||
// Act
|
||||
expect(await authService.tryConsumeGrantToken(grantToken)).toBe(true);
|
||||
});
|
||||
|
||||
it('should return false for a already used grant token', async () => {
|
||||
// Arrange
|
||||
const grantToken = await authService.createGrantToken();
|
||||
|
||||
// Act
|
||||
expect(await authService.tryConsumeGrantToken(grantToken)).toBe(true);
|
||||
expect(await authService.tryConsumeGrantToken(grantToken)).toBe(false);
|
||||
});
|
||||
|
||||
it('should return false for an expired grant token', async () => {
|
||||
// Arrange
|
||||
const grantToken = await authService.createGrantToken();
|
||||
|
||||
// Act
|
||||
await sleep(TTL + 1);
|
||||
|
||||
expect(await authService.tryConsumeGrantToken(grantToken)).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
62
packages/cli/src/runners/auth/task-runner-auth.controller.ts
Normal file
62
packages/cli/src/runners/auth/task-runner-auth.controller.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import type { NextFunction, Response } from 'express';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import type { AuthlessRequest } from '@/requests';
|
||||
|
||||
import { taskRunnerAuthRequestBodySchema } from './task-runner-auth.schema';
|
||||
import { TaskRunnerAuthService } from './task-runner-auth.service';
|
||||
import { BadRequestError } from '../../errors/response-errors/bad-request.error';
|
||||
import { ForbiddenError } from '../../errors/response-errors/forbidden.error';
|
||||
import type { TaskRunnerServerInitRequest } from '../runner-types';
|
||||
|
||||
/**
|
||||
* Controller responsible for authenticating Task Runner connections
|
||||
*/
|
||||
@Service()
|
||||
export class TaskRunnerAuthController {
|
||||
constructor(private readonly taskRunnerAuthService: TaskRunnerAuthService) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
this.authMiddleware = this.authMiddleware.bind(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the provided auth token and creates and responds with a grant token,
|
||||
* which can be used to initiate a task runner connection.
|
||||
*/
|
||||
async createGrantToken(req: AuthlessRequest) {
|
||||
const result = await taskRunnerAuthRequestBodySchema.safeParseAsync(req.body);
|
||||
if (!result.success) {
|
||||
throw new BadRequestError(result.error.errors[0].code);
|
||||
}
|
||||
|
||||
const { token: authToken } = result.data;
|
||||
if (!this.taskRunnerAuthService.isValidAuthToken(authToken)) {
|
||||
throw new ForbiddenError();
|
||||
}
|
||||
|
||||
const grantToken = await this.taskRunnerAuthService.createGrantToken();
|
||||
return {
|
||||
token: grantToken,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Middleware to authenticate task runner init requests
|
||||
*/
|
||||
async authMiddleware(req: TaskRunnerServerInitRequest, res: Response, next: NextFunction) {
|
||||
const authHeader = req.headers.authorization;
|
||||
if (typeof authHeader !== 'string' || !authHeader.startsWith('Bearer ')) {
|
||||
res.status(401).json({ code: 401, message: 'Unauthorized' });
|
||||
return;
|
||||
}
|
||||
|
||||
const grantToken = authHeader.slice('Bearer '.length);
|
||||
const isConsumed = await this.taskRunnerAuthService.tryConsumeGrantToken(grantToken);
|
||||
if (!isConsumed) {
|
||||
res.status(403).json({ code: 403, message: 'Forbidden' });
|
||||
return;
|
||||
}
|
||||
|
||||
next();
|
||||
}
|
||||
}
|
||||
5
packages/cli/src/runners/auth/task-runner-auth.schema.ts
Normal file
5
packages/cli/src/runners/auth/task-runner-auth.schema.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const taskRunnerAuthRequestBodySchema = z.object({
|
||||
token: z.string().min(1),
|
||||
});
|
||||
56
packages/cli/src/runners/auth/task-runner-auth.service.ts
Normal file
56
packages/cli/src/runners/auth/task-runner-auth.service.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { randomBytes } from 'crypto';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import { Time } from '@/constants';
|
||||
import { CacheService } from '@/services/cache/cache.service';
|
||||
|
||||
const GRANT_TOKEN_TTL = 15 * Time.seconds.toMilliseconds;
|
||||
|
||||
@Service()
|
||||
export class TaskRunnerAuthService {
|
||||
constructor(
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly cacheService: CacheService,
|
||||
// For unit testing purposes
|
||||
private readonly grantTokenTtl = GRANT_TOKEN_TTL,
|
||||
) {}
|
||||
|
||||
isValidAuthToken(token: string) {
|
||||
return token === this.globalConfig.taskRunners.authToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns grant token that can be used to establish a task runner connection
|
||||
*/
|
||||
async createGrantToken() {
|
||||
const grantToken = this.generateGrantToken();
|
||||
|
||||
const key = this.cacheKeyForGrantToken(grantToken);
|
||||
await this.cacheService.set(key, '1', this.grantTokenTtl);
|
||||
|
||||
return grantToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given `grantToken` is a valid token and marks it as
|
||||
* used.
|
||||
*/
|
||||
async tryConsumeGrantToken(grantToken: string) {
|
||||
const key = this.cacheKeyForGrantToken(grantToken);
|
||||
const consumed = await this.cacheService.get<string>(key);
|
||||
// Not found from cache --> Invalid token
|
||||
if (consumed === undefined) return false;
|
||||
|
||||
await this.cacheService.delete(key);
|
||||
return true;
|
||||
}
|
||||
|
||||
private generateGrantToken() {
|
||||
return randomBytes(32).toString('hex');
|
||||
}
|
||||
|
||||
private cacheKeyForGrantToken(grantToken: string) {
|
||||
return `grant-token:${grantToken}`;
|
||||
}
|
||||
}
|
||||
9
packages/cli/src/runners/errors.ts
Normal file
9
packages/cli/src/runners/errors.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class TaskRejectError extends ApplicationError {
|
||||
constructor(public reason: string) {
|
||||
super(`Task rejected with reason: ${reason}`, { level: 'info' });
|
||||
}
|
||||
}
|
||||
|
||||
export class TaskError extends ApplicationError {}
|
||||
243
packages/cli/src/runners/runner-types.ts
Normal file
243
packages/cli/src/runners/runner-types.ts
Normal file
@@ -0,0 +1,243 @@
|
||||
import type { Response } from 'express';
|
||||
import type { INodeExecutionData } from 'n8n-workflow';
|
||||
import type WebSocket from 'ws';
|
||||
|
||||
import type { TaskRunner } from './task-broker.service';
|
||||
import type { AuthlessRequest } from '../requests';
|
||||
|
||||
export type DataRequestType = 'input' | 'node' | 'all';
|
||||
|
||||
export interface TaskResultData {
|
||||
result: INodeExecutionData[];
|
||||
customData?: Record<string, string>;
|
||||
}
|
||||
|
||||
export interface TaskRunnerServerInitRequest
|
||||
extends AuthlessRequest<{}, {}, {}, { id: TaskRunner['id']; token?: string }> {
|
||||
ws: WebSocket;
|
||||
}
|
||||
|
||||
export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest };
|
||||
|
||||
export namespace N8nMessage {
|
||||
export namespace ToRunner {
|
||||
export interface InfoRequest {
|
||||
type: 'broker:inforequest';
|
||||
}
|
||||
|
||||
export interface RunnerRegistered {
|
||||
type: 'broker:runnerregistered';
|
||||
}
|
||||
|
||||
export interface TaskOfferAccept {
|
||||
type: 'broker:taskofferaccept';
|
||||
taskId: string;
|
||||
offerId: string;
|
||||
}
|
||||
|
||||
export interface TaskCancel {
|
||||
type: 'broker:taskcancel';
|
||||
taskId: string;
|
||||
reason: string;
|
||||
}
|
||||
|
||||
export interface TaskSettings {
|
||||
type: 'broker:tasksettings';
|
||||
taskId: string;
|
||||
settings: unknown;
|
||||
}
|
||||
|
||||
export interface RPCResponse {
|
||||
type: 'broker:rpcresponse';
|
||||
callId: string;
|
||||
taskId: string;
|
||||
status: 'success' | 'error';
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface TaskDataResponse {
|
||||
type: 'broker:taskdataresponse';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export type All =
|
||||
| InfoRequest
|
||||
| TaskOfferAccept
|
||||
| TaskCancel
|
||||
| TaskSettings
|
||||
| RunnerRegistered
|
||||
| RPCResponse
|
||||
| TaskDataResponse;
|
||||
}
|
||||
|
||||
export namespace ToRequester {
|
||||
export interface TaskReady {
|
||||
type: 'broker:taskready';
|
||||
requestId: string;
|
||||
taskId: string;
|
||||
}
|
||||
|
||||
export interface TaskDone {
|
||||
type: 'broker:taskdone';
|
||||
taskId: string;
|
||||
data: TaskResultData;
|
||||
}
|
||||
|
||||
export interface TaskError {
|
||||
type: 'broker:taskerror';
|
||||
taskId: string;
|
||||
error: unknown;
|
||||
}
|
||||
|
||||
export interface TaskDataRequest {
|
||||
type: 'broker:taskdatarequest';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
requestType: DataRequestType;
|
||||
param?: string;
|
||||
}
|
||||
|
||||
export interface RPC {
|
||||
type: 'broker:rpc';
|
||||
callId: string;
|
||||
taskId: string;
|
||||
name: (typeof RPC_ALLOW_LIST)[number];
|
||||
params: unknown[];
|
||||
}
|
||||
|
||||
export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC;
|
||||
}
|
||||
}
|
||||
|
||||
export namespace RequesterMessage {
|
||||
export namespace ToN8n {
|
||||
export interface TaskSettings {
|
||||
type: 'requester:tasksettings';
|
||||
taskId: string;
|
||||
settings: unknown;
|
||||
}
|
||||
|
||||
export interface TaskCancel {
|
||||
type: 'requester:taskcancel';
|
||||
taskId: string;
|
||||
reason: string;
|
||||
}
|
||||
|
||||
export interface TaskDataResponse {
|
||||
type: 'requester:taskdataresponse';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface RPCResponse {
|
||||
type: 'requester:rpcresponse';
|
||||
taskId: string;
|
||||
callId: string;
|
||||
status: 'success' | 'error';
|
||||
data: unknown;
|
||||
}
|
||||
|
||||
export interface TaskRequest {
|
||||
type: 'requester:taskrequest';
|
||||
requestId: string;
|
||||
taskType: string;
|
||||
}
|
||||
|
||||
export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest;
|
||||
}
|
||||
}
|
||||
|
||||
export namespace RunnerMessage {
|
||||
export namespace ToN8n {
|
||||
export interface Info {
|
||||
type: 'runner:info';
|
||||
name: string;
|
||||
types: string[];
|
||||
}
|
||||
|
||||
export interface TaskAccepted {
|
||||
type: 'runner:taskaccepted';
|
||||
taskId: string;
|
||||
}
|
||||
|
||||
export interface TaskRejected {
|
||||
type: 'runner:taskrejected';
|
||||
taskId: string;
|
||||
reason: string;
|
||||
}
|
||||
|
||||
export interface TaskDone {
|
||||
type: 'runner:taskdone';
|
||||
taskId: string;
|
||||
data: TaskResultData;
|
||||
}
|
||||
|
||||
export interface TaskError {
|
||||
type: 'runner:taskerror';
|
||||
taskId: string;
|
||||
error: unknown;
|
||||
}
|
||||
|
||||
export interface TaskOffer {
|
||||
type: 'runner:taskoffer';
|
||||
offerId: string;
|
||||
taskType: string;
|
||||
validFor: number;
|
||||
}
|
||||
|
||||
export interface TaskDataRequest {
|
||||
type: 'runner:taskdatarequest';
|
||||
taskId: string;
|
||||
requestId: string;
|
||||
requestType: DataRequestType;
|
||||
param?: string;
|
||||
}
|
||||
|
||||
export interface RPC {
|
||||
type: 'runner:rpc';
|
||||
callId: string;
|
||||
taskId: string;
|
||||
name: (typeof RPC_ALLOW_LIST)[number];
|
||||
params: unknown[];
|
||||
}
|
||||
|
||||
export type All =
|
||||
| Info
|
||||
| TaskDone
|
||||
| TaskError
|
||||
| TaskAccepted
|
||||
| TaskRejected
|
||||
| TaskOffer
|
||||
| RPC
|
||||
| TaskDataRequest;
|
||||
}
|
||||
}
|
||||
|
||||
export const RPC_ALLOW_LIST = [
|
||||
'logNodeOutput',
|
||||
'helpers.httpRequestWithAuthentication',
|
||||
'helpers.requestWithAuthenticationPaginated',
|
||||
// "helpers.normalizeItems"
|
||||
// "helpers.constructExecutionMetaData"
|
||||
// "helpers.assertBinaryData"
|
||||
'helpers.getBinaryDataBuffer',
|
||||
// "helpers.copyInputItems"
|
||||
// "helpers.returnJsonArray"
|
||||
'helpers.getSSHClient',
|
||||
'helpers.createReadStream',
|
||||
// "helpers.getStoragePath"
|
||||
'helpers.writeContentToFile',
|
||||
'helpers.prepareBinaryData',
|
||||
'helpers.setBinaryDataBuffer',
|
||||
'helpers.copyBinaryFile',
|
||||
'helpers.binaryToBuffer',
|
||||
// "helpers.binaryToString"
|
||||
// "helpers.getBinaryPath"
|
||||
'helpers.getBinaryStream',
|
||||
'helpers.getBinaryMetadata',
|
||||
'helpers.createDeferredPromise',
|
||||
'helpers.httpRequest',
|
||||
] as const;
|
||||
188
packages/cli/src/runners/runner-ws-server.ts
Normal file
188
packages/cli/src/runners/runner-ws-server.ts
Normal file
@@ -0,0 +1,188 @@
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import type { Application } from 'express';
|
||||
import { ServerResponse, type Server } from 'http';
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
import type { Socket } from 'net';
|
||||
import Container, { Service } from 'typedi';
|
||||
import { parse as parseUrl } from 'url';
|
||||
import { Server as WSServer } from 'ws';
|
||||
import type WebSocket from 'ws';
|
||||
|
||||
import { Logger } from '@/logging/logger.service';
|
||||
import { send } from '@/response-helper';
|
||||
import { TaskRunnerAuthController } from '@/runners/auth/task-runner-auth.controller';
|
||||
|
||||
import type {
|
||||
RunnerMessage,
|
||||
N8nMessage,
|
||||
TaskRunnerServerInitRequest,
|
||||
TaskRunnerServerInitResponse,
|
||||
} from './runner-types';
|
||||
import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service';
|
||||
|
||||
function heartbeat(this: WebSocket) {
|
||||
this.isAlive = true;
|
||||
}
|
||||
|
||||
function getEndpointBasePath(restEndpoint: string) {
|
||||
const globalConfig = Container.get(GlobalConfig);
|
||||
|
||||
let path = globalConfig.taskRunners.path;
|
||||
if (path.startsWith('/')) {
|
||||
path = path.slice(1);
|
||||
}
|
||||
if (path.endsWith('/')) {
|
||||
path = path.slice(-1);
|
||||
}
|
||||
|
||||
return `/${restEndpoint}/${path}`;
|
||||
}
|
||||
|
||||
function getWsEndpoint(restEndpoint: string) {
|
||||
return `${getEndpointBasePath(restEndpoint)}/_ws`;
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class TaskRunnerService {
|
||||
runnerConnections: Record<TaskRunner['id'], WebSocket> = {};
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly taskBroker: TaskBroker,
|
||||
) {}
|
||||
|
||||
sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) {
|
||||
this.runnerConnections[id]?.send(JSON.stringify(message));
|
||||
}
|
||||
|
||||
add(id: TaskRunner['id'], connection: WebSocket) {
|
||||
connection.isAlive = true;
|
||||
connection.on('pong', heartbeat);
|
||||
|
||||
let isConnected = false;
|
||||
|
||||
const onMessage = (data: WebSocket.RawData) => {
|
||||
try {
|
||||
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
|
||||
|
||||
const message: RunnerMessage.ToN8n.All = JSON.parse(
|
||||
buffer.toString('utf8'),
|
||||
) as RunnerMessage.ToN8n.All;
|
||||
|
||||
if (!isConnected && message.type !== 'runner:info') {
|
||||
return;
|
||||
} else if (!isConnected && message.type === 'runner:info') {
|
||||
this.removeConnection(id);
|
||||
isConnected = true;
|
||||
|
||||
this.runnerConnections[id] = connection;
|
||||
|
||||
this.taskBroker.registerRunner(
|
||||
{
|
||||
id,
|
||||
taskTypes: message.types,
|
||||
lastSeen: new Date(),
|
||||
name: message.name,
|
||||
},
|
||||
this.sendMessage.bind(this, id) as MessageCallback,
|
||||
);
|
||||
|
||||
this.sendMessage(id, { type: 'broker:runnerregistered' });
|
||||
|
||||
this.logger.info(`Runner "${message.name}"(${id}) has been registered`);
|
||||
return;
|
||||
}
|
||||
|
||||
void this.taskBroker.onRunnerMessage(id, message);
|
||||
} catch (error) {
|
||||
this.logger.error(`Couldn't parse message from runner "${id}"`, {
|
||||
error: error as unknown,
|
||||
id,
|
||||
data,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Makes sure to remove the session if the connection is closed
|
||||
connection.once('close', () => {
|
||||
connection.off('pong', heartbeat);
|
||||
connection.off('message', onMessage);
|
||||
this.removeConnection(id);
|
||||
});
|
||||
|
||||
connection.on('message', onMessage);
|
||||
connection.send(
|
||||
JSON.stringify({ type: 'broker:inforequest' } as N8nMessage.ToRunner.InfoRequest),
|
||||
);
|
||||
}
|
||||
|
||||
removeConnection(id: TaskRunner['id']) {
|
||||
if (id in this.runnerConnections) {
|
||||
this.taskBroker.deregisterRunner(id);
|
||||
this.runnerConnections[id].close();
|
||||
delete this.runnerConnections[id];
|
||||
}
|
||||
}
|
||||
|
||||
handleRequest(req: TaskRunnerServerInitRequest, _res: TaskRunnerServerInitResponse) {
|
||||
this.add(req.query.id, req.ws);
|
||||
}
|
||||
}
|
||||
|
||||
// Checks for upgrade requests on the runners path and upgrades the connection
|
||||
// then, passes the request back to the app to handle the routing
|
||||
export const setupRunnerServer = (restEndpoint: string, server: Server, app: Application) => {
|
||||
const globalConfig = Container.get(GlobalConfig);
|
||||
const { authToken } = globalConfig.taskRunners;
|
||||
|
||||
if (!authToken) {
|
||||
throw new ApplicationError(
|
||||
'Authentication token must be configured when task runners are enabled. Use N8N_RUNNERS_AUTH_TOKEN environment variable to set it.',
|
||||
);
|
||||
}
|
||||
|
||||
const endpoint = getWsEndpoint(restEndpoint);
|
||||
const wsServer = new WSServer({ noServer: true });
|
||||
server.on('upgrade', (request: TaskRunnerServerInitRequest, socket: Socket, head) => {
|
||||
if (parseUrl(request.url).pathname !== endpoint) {
|
||||
// We can't close the connection here since the Push connections
|
||||
// are using the same HTTP server and upgrade requests and this
|
||||
// gets triggered for both
|
||||
return;
|
||||
}
|
||||
|
||||
wsServer.handleUpgrade(request, socket, head, (ws) => {
|
||||
request.ws = ws;
|
||||
|
||||
const response = new ServerResponse(request);
|
||||
response.writeHead = (statusCode) => {
|
||||
if (statusCode > 200) ws.close();
|
||||
return response;
|
||||
};
|
||||
|
||||
// @ts-expect-error Hidden API?
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
||||
app.handle(request, response);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
export const setupRunnerHandler = (restEndpoint: string, app: Application) => {
|
||||
const wsEndpoint = getWsEndpoint(restEndpoint);
|
||||
const authEndpoint = `${getEndpointBasePath(restEndpoint)}/auth`;
|
||||
|
||||
const taskRunnerAuthController = Container.get(TaskRunnerAuthController);
|
||||
const taskRunnerService = Container.get(TaskRunnerService);
|
||||
app.use(
|
||||
wsEndpoint,
|
||||
// eslint-disable-next-line @typescript-eslint/unbound-method
|
||||
taskRunnerAuthController.authMiddleware,
|
||||
(req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) =>
|
||||
taskRunnerService.handleRequest(req, res),
|
||||
);
|
||||
|
||||
app.post(
|
||||
authEndpoint,
|
||||
send(async (req) => await taskRunnerAuthController.createGrantToken(req)),
|
||||
);
|
||||
};
|
||||
553
packages/cli/src/runners/task-broker.service.ts
Normal file
553
packages/cli/src/runners/task-broker.service.ts
Normal file
@@ -0,0 +1,553 @@
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import { Logger } from '@/logging/logger.service';
|
||||
|
||||
import { TaskRejectError } from './errors';
|
||||
import type { N8nMessage, RunnerMessage, RequesterMessage, TaskResultData } from './runner-types';
|
||||
|
||||
export interface TaskRunner {
|
||||
id: string;
|
||||
name?: string;
|
||||
taskTypes: string[];
|
||||
lastSeen: Date;
|
||||
}
|
||||
|
||||
export interface Task {
|
||||
id: string;
|
||||
runnerId: TaskRunner['id'];
|
||||
requesterId: string;
|
||||
taskType: string;
|
||||
}
|
||||
|
||||
export interface TaskOffer {
|
||||
offerId: string;
|
||||
runnerId: TaskRunner['id'];
|
||||
taskType: string;
|
||||
validFor: number;
|
||||
validUntil: bigint;
|
||||
}
|
||||
|
||||
export interface TaskRequest {
|
||||
requestId: string;
|
||||
requesterId: string;
|
||||
taskType: string;
|
||||
|
||||
acceptInProgress?: boolean;
|
||||
}
|
||||
|
||||
export type MessageCallback = (message: N8nMessage.ToRunner.All) => Promise<void> | void;
|
||||
export type RequesterMessageCallback = (
|
||||
message: N8nMessage.ToRequester.All,
|
||||
) => Promise<void> | void;
|
||||
|
||||
type RunnerAcceptCallback = () => void;
|
||||
type RequesterAcceptCallback = (settings: RequesterMessage.ToN8n.TaskSettings['settings']) => void;
|
||||
type TaskRejectCallback = (reason: TaskRejectError) => void;
|
||||
|
||||
@Service()
|
||||
export class TaskBroker {
|
||||
private knownRunners: Map<
|
||||
TaskRunner['id'],
|
||||
{ runner: TaskRunner; messageCallback: MessageCallback }
|
||||
> = new Map();
|
||||
|
||||
private requesters: Map<string, RequesterMessageCallback> = new Map();
|
||||
|
||||
private tasks: Map<Task['id'], Task> = new Map();
|
||||
|
||||
private runnerAcceptRejects: Map<
|
||||
Task['id'],
|
||||
{ accept: RunnerAcceptCallback; reject: TaskRejectCallback }
|
||||
> = new Map();
|
||||
|
||||
private requesterAcceptRejects: Map<
|
||||
Task['id'],
|
||||
{ accept: RequesterAcceptCallback; reject: TaskRejectCallback }
|
||||
> = new Map();
|
||||
|
||||
private pendingTaskOffers: TaskOffer[] = [];
|
||||
|
||||
private pendingTaskRequests: TaskRequest[] = [];
|
||||
|
||||
constructor(private readonly logger: Logger) {}
|
||||
|
||||
expireTasks() {
|
||||
const now = process.hrtime.bigint();
|
||||
const invalidOffers: number[] = [];
|
||||
for (let i = 0; i < this.pendingTaskOffers.length; i++) {
|
||||
if (this.pendingTaskOffers[i].validUntil < now) {
|
||||
invalidOffers.push(i);
|
||||
}
|
||||
}
|
||||
|
||||
// We reverse the list so the later indexes are valid after deleting earlier ones
|
||||
invalidOffers.reverse().forEach((i) => this.pendingTaskOffers.splice(i, 1));
|
||||
}
|
||||
|
||||
registerRunner(runner: TaskRunner, messageCallback: MessageCallback) {
|
||||
this.knownRunners.set(runner.id, { runner, messageCallback });
|
||||
}
|
||||
|
||||
deregisterRunner(runnerId: string) {
|
||||
this.knownRunners.delete(runnerId);
|
||||
}
|
||||
|
||||
registerRequester(requesterId: string, messageCallback: RequesterMessageCallback) {
|
||||
this.requesters.set(requesterId, messageCallback);
|
||||
}
|
||||
|
||||
deregisterRequester(requesterId: string) {
|
||||
this.requesters.delete(requesterId);
|
||||
}
|
||||
|
||||
private async messageRunner(runnerId: TaskRunner['id'], message: N8nMessage.ToRunner.All) {
|
||||
await this.knownRunners.get(runnerId)?.messageCallback(message);
|
||||
}
|
||||
|
||||
private async messageRequester(requesterId: string, message: N8nMessage.ToRequester.All) {
|
||||
await this.requesters.get(requesterId)?.(message);
|
||||
}
|
||||
|
||||
async onRunnerMessage(runnerId: TaskRunner['id'], message: RunnerMessage.ToN8n.All) {
|
||||
const runner = this.knownRunners.get(runnerId);
|
||||
if (!runner) {
|
||||
return;
|
||||
}
|
||||
switch (message.type) {
|
||||
case 'runner:taskaccepted':
|
||||
this.handleRunnerAccept(message.taskId);
|
||||
break;
|
||||
case 'runner:taskrejected':
|
||||
this.handleRunnerReject(message.taskId, message.reason);
|
||||
break;
|
||||
case 'runner:taskoffer':
|
||||
this.taskOffered({
|
||||
runnerId,
|
||||
taskType: message.taskType,
|
||||
offerId: message.offerId,
|
||||
validFor: message.validFor,
|
||||
validUntil: process.hrtime.bigint() + BigInt(message.validFor * 1_000_000),
|
||||
});
|
||||
break;
|
||||
case 'runner:taskdone':
|
||||
await this.taskDoneHandler(message.taskId, message.data);
|
||||
break;
|
||||
case 'runner:taskerror':
|
||||
await this.taskErrorHandler(message.taskId, message.error);
|
||||
break;
|
||||
case 'runner:taskdatarequest':
|
||||
await this.handleDataRequest(
|
||||
message.taskId,
|
||||
message.requestId,
|
||||
message.requestType,
|
||||
message.param,
|
||||
);
|
||||
break;
|
||||
|
||||
case 'runner:rpc':
|
||||
await this.handleRpcRequest(message.taskId, message.callId, message.name, message.params);
|
||||
break;
|
||||
// Already handled
|
||||
case 'runner:info':
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async handleRpcRequest(
|
||||
taskId: Task['id'],
|
||||
callId: string,
|
||||
name: RunnerMessage.ToN8n.RPC['name'],
|
||||
params: unknown[],
|
||||
) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
await this.messageRequester(task.requesterId, {
|
||||
type: 'broker:rpc',
|
||||
taskId,
|
||||
callId,
|
||||
name,
|
||||
params,
|
||||
});
|
||||
}
|
||||
|
||||
handleRunnerAccept(taskId: Task['id']) {
|
||||
const acceptReject = this.runnerAcceptRejects.get(taskId);
|
||||
if (acceptReject) {
|
||||
acceptReject.accept();
|
||||
this.runnerAcceptRejects.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
handleRunnerReject(taskId: Task['id'], reason: string) {
|
||||
const acceptReject = this.runnerAcceptRejects.get(taskId);
|
||||
if (acceptReject) {
|
||||
acceptReject.reject(new TaskRejectError(reason));
|
||||
this.runnerAcceptRejects.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
async handleDataRequest(
|
||||
taskId: Task['id'],
|
||||
requestId: RunnerMessage.ToN8n.TaskDataRequest['requestId'],
|
||||
requestType: RunnerMessage.ToN8n.TaskDataRequest['requestType'],
|
||||
param?: string,
|
||||
) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
await this.messageRequester(task.requesterId, {
|
||||
type: 'broker:taskdatarequest',
|
||||
taskId,
|
||||
requestId,
|
||||
requestType,
|
||||
param,
|
||||
});
|
||||
}
|
||||
|
||||
async handleResponse(
|
||||
taskId: Task['id'],
|
||||
requestId: RunnerMessage.ToN8n.TaskDataRequest['requestId'],
|
||||
data: unknown,
|
||||
) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
await this.messageRunner(task.requesterId, {
|
||||
type: 'broker:taskdataresponse',
|
||||
taskId,
|
||||
requestId,
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
async onRequesterMessage(requesterId: string, message: RequesterMessage.ToN8n.All) {
|
||||
switch (message.type) {
|
||||
case 'requester:tasksettings':
|
||||
this.handleRequesterAccept(message.taskId, message.settings);
|
||||
break;
|
||||
case 'requester:taskcancel':
|
||||
await this.cancelTask(message.taskId, message.reason);
|
||||
break;
|
||||
case 'requester:taskrequest':
|
||||
this.taskRequested({
|
||||
taskType: message.taskType,
|
||||
requestId: message.requestId,
|
||||
requesterId,
|
||||
});
|
||||
break;
|
||||
case 'requester:taskdataresponse':
|
||||
await this.handleRequesterDataResponse(message.taskId, message.requestId, message.data);
|
||||
break;
|
||||
case 'requester:rpcresponse':
|
||||
await this.handleRequesterRpcResponse(
|
||||
message.taskId,
|
||||
message.callId,
|
||||
message.status,
|
||||
message.data,
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async handleRequesterRpcResponse(
|
||||
taskId: string,
|
||||
callId: string,
|
||||
status: RequesterMessage.ToN8n.RPCResponse['status'],
|
||||
data: unknown,
|
||||
) {
|
||||
const runner = await this.getRunnerOrFailTask(taskId);
|
||||
await this.messageRunner(runner.id, {
|
||||
type: 'broker:rpcresponse',
|
||||
taskId,
|
||||
callId,
|
||||
status,
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
async handleRequesterDataResponse(taskId: Task['id'], requestId: string, data: unknown) {
|
||||
const runner = await this.getRunnerOrFailTask(taskId);
|
||||
|
||||
await this.messageRunner(runner.id, {
|
||||
type: 'broker:taskdataresponse',
|
||||
taskId,
|
||||
requestId,
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
handleRequesterAccept(
|
||||
taskId: Task['id'],
|
||||
settings: RequesterMessage.ToN8n.TaskSettings['settings'],
|
||||
) {
|
||||
const acceptReject = this.requesterAcceptRejects.get(taskId);
|
||||
if (acceptReject) {
|
||||
acceptReject.accept(settings);
|
||||
this.requesterAcceptRejects.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
handleRequesterReject(taskId: Task['id'], reason: string) {
|
||||
const acceptReject = this.requesterAcceptRejects.get(taskId);
|
||||
if (acceptReject) {
|
||||
acceptReject.reject(new TaskRejectError(reason));
|
||||
this.requesterAcceptRejects.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
private async cancelTask(taskId: Task['id'], reason: string) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
this.tasks.delete(taskId);
|
||||
|
||||
await this.messageRunner(task.runnerId, {
|
||||
type: 'broker:taskcancel',
|
||||
taskId,
|
||||
reason,
|
||||
});
|
||||
}
|
||||
|
||||
private async failTask(taskId: Task['id'], reason: string) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
this.tasks.delete(taskId);
|
||||
// TODO: special message type?
|
||||
await this.messageRequester(task.requesterId, {
|
||||
type: 'broker:taskerror',
|
||||
taskId,
|
||||
error: reason,
|
||||
});
|
||||
}
|
||||
|
||||
private async getRunnerOrFailTask(taskId: Task['id']): Promise<TaskRunner> {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
throw new ApplicationError(`Cannot find runner, failed to find task (${taskId})`, {
|
||||
level: 'error',
|
||||
});
|
||||
}
|
||||
const runner = this.knownRunners.get(task.runnerId);
|
||||
if (!runner) {
|
||||
const reason = `Cannot find runner, failed to find runner (${task.runnerId})`;
|
||||
await this.failTask(taskId, reason);
|
||||
throw new ApplicationError(reason, {
|
||||
level: 'error',
|
||||
});
|
||||
}
|
||||
return runner.runner;
|
||||
}
|
||||
|
||||
async sendTaskSettings(taskId: Task['id'], settings: unknown) {
|
||||
const runner = await this.getRunnerOrFailTask(taskId);
|
||||
await this.messageRunner(runner.id, {
|
||||
type: 'broker:tasksettings',
|
||||
taskId,
|
||||
settings,
|
||||
});
|
||||
}
|
||||
|
||||
async taskDoneHandler(taskId: Task['id'], data: TaskResultData) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
await this.requesters.get(task.requesterId)?.({
|
||||
type: 'broker:taskdone',
|
||||
taskId: task.id,
|
||||
data,
|
||||
});
|
||||
this.tasks.delete(task.id);
|
||||
}
|
||||
|
||||
async taskErrorHandler(taskId: Task['id'], error: unknown) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
await this.requesters.get(task.requesterId)?.({
|
||||
type: 'broker:taskerror',
|
||||
taskId: task.id,
|
||||
error,
|
||||
});
|
||||
this.tasks.delete(task.id);
|
||||
}
|
||||
|
||||
async acceptOffer(offer: TaskOffer, request: TaskRequest): Promise<void> {
|
||||
const taskId = nanoid(8);
|
||||
|
||||
try {
|
||||
const acceptPromise = new Promise((resolve, reject) => {
|
||||
this.runnerAcceptRejects.set(taskId, { accept: resolve as () => void, reject });
|
||||
|
||||
// TODO: customisable timeout
|
||||
setTimeout(() => {
|
||||
reject('Runner timed out');
|
||||
}, 2000);
|
||||
});
|
||||
|
||||
await this.messageRunner(offer.runnerId, {
|
||||
type: 'broker:taskofferaccept',
|
||||
offerId: offer.offerId,
|
||||
taskId,
|
||||
});
|
||||
|
||||
await acceptPromise;
|
||||
} catch (e) {
|
||||
request.acceptInProgress = false;
|
||||
if (e instanceof TaskRejectError) {
|
||||
this.logger.info(`Task (${taskId}) rejected by Runner with reason "${e.reason}"`);
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
const task: Task = {
|
||||
id: taskId,
|
||||
taskType: offer.taskType,
|
||||
runnerId: offer.runnerId,
|
||||
requesterId: request.requesterId,
|
||||
};
|
||||
|
||||
this.tasks.set(taskId, task);
|
||||
const requestIndex = this.pendingTaskRequests.findIndex(
|
||||
(r) => r.requestId === request.requestId,
|
||||
);
|
||||
if (requestIndex === -1) {
|
||||
this.logger.error(
|
||||
`Failed to find task request (${request.requestId}) after a task was accepted. This shouldn't happen, and might be a race condition.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
this.pendingTaskRequests.splice(requestIndex, 1);
|
||||
|
||||
try {
|
||||
const acceptPromise = new Promise<RequesterMessage.ToN8n.TaskSettings['settings']>(
|
||||
(resolve, reject) => {
|
||||
this.requesterAcceptRejects.set(taskId, {
|
||||
accept: resolve as (settings: RequesterMessage.ToN8n.TaskSettings['settings']) => void,
|
||||
reject,
|
||||
});
|
||||
|
||||
// TODO: customisable timeout
|
||||
setTimeout(() => {
|
||||
reject('Requester timed out');
|
||||
}, 2000);
|
||||
},
|
||||
);
|
||||
|
||||
await this.messageRequester(request.requesterId, {
|
||||
type: 'broker:taskready',
|
||||
requestId: request.requestId,
|
||||
taskId,
|
||||
});
|
||||
|
||||
const settings = await acceptPromise;
|
||||
await this.sendTaskSettings(task.id, settings);
|
||||
} catch (e) {
|
||||
if (e instanceof TaskRejectError) {
|
||||
await this.cancelTask(task.id, e.reason);
|
||||
this.logger.info(`Task (${taskId}) rejected by Requester with reason "${e.reason}"`);
|
||||
return;
|
||||
}
|
||||
await this.cancelTask(task.id, 'Unknown reason');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// Find matching task offers and requests, then let the runner
|
||||
// know that an offer has been accepted
|
||||
//
|
||||
// *DO NOT MAKE THIS FUNCTION ASYNC*
|
||||
// This function relies on never yielding.
|
||||
// If you need to make this function async, you'll need to
|
||||
// implement some kind of locking for the requests and task
|
||||
// lists
|
||||
settleTasks() {
|
||||
this.expireTasks();
|
||||
|
||||
for (const request of this.pendingTaskRequests) {
|
||||
if (request.acceptInProgress) {
|
||||
continue;
|
||||
}
|
||||
const offerIndex = this.pendingTaskOffers.findIndex((o) => o.taskType === request.taskType);
|
||||
if (offerIndex === -1) {
|
||||
continue;
|
||||
}
|
||||
const offer = this.pendingTaskOffers[offerIndex];
|
||||
|
||||
request.acceptInProgress = true;
|
||||
this.pendingTaskOffers.splice(offerIndex, 1);
|
||||
|
||||
void this.acceptOffer(offer, request);
|
||||
}
|
||||
}
|
||||
|
||||
taskRequested(request: TaskRequest) {
|
||||
this.pendingTaskRequests.push(request);
|
||||
this.settleTasks();
|
||||
}
|
||||
|
||||
taskOffered(offer: TaskOffer) {
|
||||
this.pendingTaskOffers.push(offer);
|
||||
this.settleTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing only
|
||||
*/
|
||||
|
||||
getTasks() {
|
||||
return this.tasks;
|
||||
}
|
||||
|
||||
getPendingTaskOffers() {
|
||||
return this.pendingTaskOffers;
|
||||
}
|
||||
|
||||
getPendingTaskRequests() {
|
||||
return this.pendingTaskRequests;
|
||||
}
|
||||
|
||||
getKnownRunners() {
|
||||
return this.knownRunners;
|
||||
}
|
||||
|
||||
getKnownRequesters() {
|
||||
return this.requesters;
|
||||
}
|
||||
|
||||
getRunnerAcceptRejects() {
|
||||
return this.runnerAcceptRejects;
|
||||
}
|
||||
|
||||
setTasks(tasks: Record<string, Task>) {
|
||||
this.tasks = new Map(Object.entries(tasks));
|
||||
}
|
||||
|
||||
setPendingTaskOffers(pendingTaskOffers: TaskOffer[]) {
|
||||
this.pendingTaskOffers = pendingTaskOffers;
|
||||
}
|
||||
|
||||
setPendingTaskRequests(pendingTaskRequests: TaskRequest[]) {
|
||||
this.pendingTaskRequests = pendingTaskRequests;
|
||||
}
|
||||
|
||||
setRunnerAcceptRejects(
|
||||
runnerAcceptRejects: Record<
|
||||
string,
|
||||
{ accept: RunnerAcceptCallback; reject: TaskRejectCallback }
|
||||
>,
|
||||
) {
|
||||
this.runnerAcceptRejects = new Map(Object.entries(runnerAcceptRejects));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import Container from 'typedi';
|
||||
|
||||
import { TaskManager } from './task-manager';
|
||||
import type { RequesterMessage } from '../runner-types';
|
||||
import type { RequesterMessageCallback } from '../task-broker.service';
|
||||
import { TaskBroker } from '../task-broker.service';
|
||||
|
||||
export class SingleMainTaskManager extends TaskManager {
|
||||
taskBroker: TaskBroker;
|
||||
|
||||
id: string = 'single-main';
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
this.registerRequester();
|
||||
}
|
||||
|
||||
registerRequester() {
|
||||
this.taskBroker = Container.get(TaskBroker);
|
||||
|
||||
this.taskBroker.registerRequester(
|
||||
this.id,
|
||||
this.onMessage.bind(this) as RequesterMessageCallback,
|
||||
);
|
||||
}
|
||||
|
||||
sendMessage(message: RequesterMessage.ToN8n.All) {
|
||||
void this.taskBroker.onRequesterMessage(this.id, message);
|
||||
}
|
||||
}
|
||||
410
packages/cli/src/runners/task-managers/task-manager.ts
Normal file
410
packages/cli/src/runners/task-managers/task-manager.ts
Normal file
@@ -0,0 +1,410 @@
|
||||
import {
|
||||
type IExecuteFunctions,
|
||||
type Workflow,
|
||||
type IRunExecutionData,
|
||||
type INodeExecutionData,
|
||||
type ITaskDataConnections,
|
||||
type INode,
|
||||
type WorkflowParameters,
|
||||
type INodeParameters,
|
||||
type WorkflowExecuteMode,
|
||||
type IExecuteData,
|
||||
type IDataObject,
|
||||
type IWorkflowExecuteAdditionalData,
|
||||
} from 'n8n-workflow';
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import { TaskError } from '@/runners/errors';
|
||||
|
||||
import {
|
||||
RPC_ALLOW_LIST,
|
||||
type TaskResultData,
|
||||
type N8nMessage,
|
||||
type RequesterMessage,
|
||||
} from '../runner-types';
|
||||
|
||||
export type RequestAccept = (jobId: string) => void;
|
||||
export type RequestReject = (reason: string) => void;
|
||||
|
||||
export type TaskAccept = (data: TaskResultData) => void;
|
||||
export type TaskReject = (error: unknown) => void;
|
||||
|
||||
export interface TaskData {
|
||||
executeFunctions: IExecuteFunctions;
|
||||
inputData: ITaskDataConnections;
|
||||
node: INode;
|
||||
|
||||
workflow: Workflow;
|
||||
runExecutionData: IRunExecutionData;
|
||||
runIndex: number;
|
||||
itemIndex: number;
|
||||
activeNodeName: string;
|
||||
connectionInputData: INodeExecutionData[];
|
||||
siblingParameters: INodeParameters;
|
||||
mode: WorkflowExecuteMode;
|
||||
executeData?: IExecuteData;
|
||||
defaultReturnRunIndex: number;
|
||||
selfData: IDataObject;
|
||||
contextNodeName: string;
|
||||
additionalData: IWorkflowExecuteAdditionalData;
|
||||
}
|
||||
|
||||
export interface PartialAdditionalData {
|
||||
executionId?: string;
|
||||
restartExecutionId?: string;
|
||||
restApiUrl: string;
|
||||
instanceBaseUrl: string;
|
||||
formWaitingBaseUrl: string;
|
||||
webhookBaseUrl: string;
|
||||
webhookWaitingBaseUrl: string;
|
||||
webhookTestBaseUrl: string;
|
||||
currentNodeParameters?: INodeParameters;
|
||||
executionTimeoutTimestamp?: number;
|
||||
userId?: string;
|
||||
variables: IDataObject;
|
||||
}
|
||||
|
||||
export interface AllCodeTaskData {
|
||||
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
|
||||
inputData: ITaskDataConnections;
|
||||
node: INode;
|
||||
|
||||
runExecutionData: IRunExecutionData;
|
||||
runIndex: number;
|
||||
itemIndex: number;
|
||||
activeNodeName: string;
|
||||
connectionInputData: INodeExecutionData[];
|
||||
siblingParameters: INodeParameters;
|
||||
mode: WorkflowExecuteMode;
|
||||
executeData?: IExecuteData;
|
||||
defaultReturnRunIndex: number;
|
||||
selfData: IDataObject;
|
||||
contextNodeName: string;
|
||||
additionalData: PartialAdditionalData;
|
||||
}
|
||||
|
||||
export interface TaskRequest {
|
||||
requestId: string;
|
||||
taskType: string;
|
||||
settings: unknown;
|
||||
data: TaskData;
|
||||
}
|
||||
|
||||
export interface Task {
|
||||
taskId: string;
|
||||
settings: unknown;
|
||||
data: TaskData;
|
||||
}
|
||||
|
||||
interface ExecuteFunctionObject {
|
||||
[name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject;
|
||||
}
|
||||
|
||||
const workflowToParameters = (workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> => {
|
||||
return {
|
||||
id: workflow.id,
|
||||
name: workflow.name,
|
||||
active: workflow.active,
|
||||
connections: workflow.connectionsBySourceNode,
|
||||
nodes: Object.values(workflow.nodes),
|
||||
pinData: workflow.pinData,
|
||||
settings: workflow.settings,
|
||||
staticData: workflow.staticData,
|
||||
};
|
||||
};
|
||||
|
||||
export class TaskManager {
|
||||
requestAcceptRejects: Map<string, { accept: RequestAccept; reject: RequestReject }> = new Map();
|
||||
|
||||
taskAcceptRejects: Map<string, { accept: TaskAccept; reject: TaskReject }> = new Map();
|
||||
|
||||
pendingRequests: Map<string, TaskRequest> = new Map();
|
||||
|
||||
tasks: Map<string, Task> = new Map();
|
||||
|
||||
async startTask<T>(
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
taskType: string,
|
||||
settings: unknown,
|
||||
executeFunctions: IExecuteFunctions,
|
||||
inputData: ITaskDataConnections,
|
||||
node: INode,
|
||||
workflow: Workflow,
|
||||
runExecutionData: IRunExecutionData,
|
||||
runIndex: number,
|
||||
itemIndex: number,
|
||||
activeNodeName: string,
|
||||
connectionInputData: INodeExecutionData[],
|
||||
siblingParameters: INodeParameters,
|
||||
mode: WorkflowExecuteMode,
|
||||
executeData?: IExecuteData,
|
||||
defaultReturnRunIndex = -1,
|
||||
selfData: IDataObject = {},
|
||||
contextNodeName: string = activeNodeName,
|
||||
): Promise<T> {
|
||||
const data: TaskData = {
|
||||
workflow,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
connectionInputData,
|
||||
inputData,
|
||||
node,
|
||||
executeFunctions,
|
||||
itemIndex,
|
||||
siblingParameters,
|
||||
mode,
|
||||
executeData,
|
||||
defaultReturnRunIndex,
|
||||
selfData,
|
||||
contextNodeName,
|
||||
activeNodeName,
|
||||
additionalData,
|
||||
};
|
||||
|
||||
const request: TaskRequest = {
|
||||
requestId: nanoid(),
|
||||
taskType,
|
||||
settings,
|
||||
data,
|
||||
};
|
||||
|
||||
this.pendingRequests.set(request.requestId, request);
|
||||
|
||||
const taskIdPromise = new Promise<string>((resolve, reject) => {
|
||||
this.requestAcceptRejects.set(request.requestId, {
|
||||
accept: resolve,
|
||||
reject,
|
||||
});
|
||||
});
|
||||
|
||||
this.sendMessage({
|
||||
type: 'requester:taskrequest',
|
||||
requestId: request.requestId,
|
||||
taskType,
|
||||
});
|
||||
|
||||
const taskId = await taskIdPromise;
|
||||
|
||||
const task: Task = {
|
||||
taskId,
|
||||
data,
|
||||
settings,
|
||||
};
|
||||
this.tasks.set(task.taskId, task);
|
||||
|
||||
try {
|
||||
const dataPromise = new Promise<TaskResultData>((resolve, reject) => {
|
||||
this.taskAcceptRejects.set(task.taskId, {
|
||||
accept: resolve,
|
||||
reject,
|
||||
});
|
||||
});
|
||||
|
||||
this.sendMessage({
|
||||
type: 'requester:tasksettings',
|
||||
taskId,
|
||||
settings,
|
||||
});
|
||||
|
||||
const resultData = await dataPromise;
|
||||
// Set custom execution data (`$execution.customData`) if sent
|
||||
if (resultData.customData) {
|
||||
Object.entries(resultData.customData).forEach(([k, v]) => {
|
||||
if (!runExecutionData.resultData.metadata) {
|
||||
runExecutionData.resultData.metadata = {};
|
||||
}
|
||||
runExecutionData.resultData.metadata[k] = v;
|
||||
});
|
||||
}
|
||||
return resultData.result as T;
|
||||
} catch (e) {
|
||||
if (typeof e === 'string') {
|
||||
throw new TaskError(e, {
|
||||
level: 'error',
|
||||
});
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
this.tasks.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
sendMessage(_message: RequesterMessage.ToN8n.All) {}
|
||||
|
||||
onMessage(message: N8nMessage.ToRequester.All) {
|
||||
switch (message.type) {
|
||||
case 'broker:taskready':
|
||||
this.taskReady(message.requestId, message.taskId);
|
||||
break;
|
||||
case 'broker:taskdone':
|
||||
this.taskDone(message.taskId, message.data);
|
||||
break;
|
||||
case 'broker:taskerror':
|
||||
this.taskError(message.taskId, message.error);
|
||||
break;
|
||||
case 'broker:taskdatarequest':
|
||||
this.sendTaskData(message.taskId, message.requestId, message.requestType);
|
||||
break;
|
||||
case 'broker:rpc':
|
||||
void this.handleRpc(message.taskId, message.callId, message.name, message.params);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taskReady(requestId: string, taskId: string) {
|
||||
const acceptReject = this.requestAcceptRejects.get(requestId);
|
||||
if (!acceptReject) {
|
||||
this.rejectTask(
|
||||
taskId,
|
||||
'Request ID not found. In multi-main setup, it is possible for one of the mains to have reported ready state already.',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
acceptReject.accept(taskId);
|
||||
this.requestAcceptRejects.delete(requestId);
|
||||
}
|
||||
|
||||
rejectTask(jobId: string, reason: string) {
|
||||
this.sendMessage({
|
||||
type: 'requester:taskcancel',
|
||||
taskId: jobId,
|
||||
reason,
|
||||
});
|
||||
}
|
||||
|
||||
taskDone(taskId: string, data: TaskResultData) {
|
||||
const acceptReject = this.taskAcceptRejects.get(taskId);
|
||||
if (acceptReject) {
|
||||
acceptReject.accept(data);
|
||||
this.taskAcceptRejects.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
taskError(taskId: string, error: unknown) {
|
||||
const acceptReject = this.taskAcceptRejects.get(taskId);
|
||||
if (acceptReject) {
|
||||
acceptReject.reject(error);
|
||||
this.taskAcceptRejects.delete(taskId);
|
||||
}
|
||||
}
|
||||
|
||||
sendTaskData(
|
||||
taskId: string,
|
||||
requestId: string,
|
||||
requestType: N8nMessage.ToRequester.TaskDataRequest['requestType'],
|
||||
) {
|
||||
const job = this.tasks.get(taskId);
|
||||
if (!job) {
|
||||
// TODO: logging
|
||||
return;
|
||||
}
|
||||
if (requestType === 'all') {
|
||||
const jd = job.data;
|
||||
const ad = jd.additionalData;
|
||||
const data: AllCodeTaskData = {
|
||||
workflow: workflowToParameters(jd.workflow),
|
||||
connectionInputData: jd.connectionInputData,
|
||||
inputData: jd.inputData,
|
||||
itemIndex: jd.itemIndex,
|
||||
activeNodeName: jd.activeNodeName,
|
||||
contextNodeName: jd.contextNodeName,
|
||||
defaultReturnRunIndex: jd.defaultReturnRunIndex,
|
||||
mode: jd.mode,
|
||||
node: jd.node,
|
||||
runExecutionData: jd.runExecutionData,
|
||||
runIndex: jd.runIndex,
|
||||
selfData: jd.selfData,
|
||||
siblingParameters: jd.siblingParameters,
|
||||
executeData: jd.executeData,
|
||||
additionalData: {
|
||||
formWaitingBaseUrl: ad.formWaitingBaseUrl,
|
||||
instanceBaseUrl: ad.instanceBaseUrl,
|
||||
restApiUrl: ad.restApiUrl,
|
||||
variables: ad.variables,
|
||||
webhookBaseUrl: ad.webhookBaseUrl,
|
||||
webhookTestBaseUrl: ad.webhookTestBaseUrl,
|
||||
webhookWaitingBaseUrl: ad.webhookWaitingBaseUrl,
|
||||
currentNodeParameters: ad.currentNodeParameters,
|
||||
executionId: ad.executionId,
|
||||
executionTimeoutTimestamp: ad.executionTimeoutTimestamp,
|
||||
restartExecutionId: ad.restartExecutionId,
|
||||
userId: ad.userId,
|
||||
},
|
||||
};
|
||||
this.sendMessage({
|
||||
type: 'requester:taskdataresponse',
|
||||
taskId,
|
||||
requestId,
|
||||
data,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async handleRpc(
|
||||
taskId: string,
|
||||
callId: string,
|
||||
name: N8nMessage.ToRequester.RPC['name'],
|
||||
params: unknown[],
|
||||
) {
|
||||
const job = this.tasks.get(taskId);
|
||||
if (!job) {
|
||||
// TODO: logging
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!RPC_ALLOW_LIST.includes(name)) {
|
||||
this.sendMessage({
|
||||
type: 'requester:rpcresponse',
|
||||
taskId,
|
||||
callId,
|
||||
status: 'error',
|
||||
data: 'Method not allowed',
|
||||
});
|
||||
return;
|
||||
}
|
||||
const splitPath = name.split('.');
|
||||
|
||||
const funcs = job.data.executeFunctions;
|
||||
|
||||
let func: ((...args: unknown[]) => Promise<unknown>) | undefined = undefined;
|
||||
let funcObj: ExecuteFunctionObject[string] | undefined =
|
||||
funcs as unknown as ExecuteFunctionObject;
|
||||
for (const part of splitPath) {
|
||||
funcObj = (funcObj as ExecuteFunctionObject)[part] ?? undefined;
|
||||
if (!funcObj) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
func = funcObj as unknown as (...args: unknown[]) => Promise<unknown>;
|
||||
if (!func) {
|
||||
this.sendMessage({
|
||||
type: 'requester:rpcresponse',
|
||||
taskId,
|
||||
callId,
|
||||
status: 'error',
|
||||
data: 'Could not find method',
|
||||
});
|
||||
return;
|
||||
}
|
||||
const data = (await func.call(funcs, ...params)) as unknown;
|
||||
|
||||
this.sendMessage({
|
||||
type: 'requester:rpcresponse',
|
||||
taskId,
|
||||
callId,
|
||||
status: 'success',
|
||||
data,
|
||||
});
|
||||
} catch (e) {
|
||||
this.sendMessage({
|
||||
type: 'requester:rpcresponse',
|
||||
taskId,
|
||||
callId,
|
||||
status: 'error',
|
||||
data: e,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ import { isApiEnabled, loadPublicApiVersions } from '@/public-api';
|
||||
import { setupPushServer, setupPushHandler, Push } from '@/push';
|
||||
import type { APIRequest } from '@/requests';
|
||||
import * as ResponseHelper from '@/response-helper';
|
||||
import { setupRunnerServer, setupRunnerHandler } from '@/runners/runner-ws-server';
|
||||
import type { FrontendService } from '@/services/frontend.service';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
|
||||
@@ -201,6 +202,10 @@ export class Server extends AbstractServer {
|
||||
const { restEndpoint, app } = this;
|
||||
setupPushHandler(restEndpoint, app);
|
||||
|
||||
if (!this.globalConfig.taskRunners.disabled) {
|
||||
setupRunnerHandler(restEndpoint, app);
|
||||
}
|
||||
|
||||
const push = Container.get(Push);
|
||||
if (push.isBidirectional) {
|
||||
const { CollaborationService } = await import('@/collaboration/collaboration.service');
|
||||
@@ -400,4 +405,9 @@ export class Server extends AbstractServer {
|
||||
const { restEndpoint, server, app } = this;
|
||||
setupPushServer(restEndpoint, server, app);
|
||||
}
|
||||
|
||||
protected setupRunnerServer(): void {
|
||||
const { restEndpoint, server, app } = this;
|
||||
setupRunnerServer(restEndpoint, server, app);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,9 @@ export class CacheService extends TypedEmitter<CacheEvents> {
|
||||
// storing
|
||||
// ----------------------------------
|
||||
|
||||
/**
|
||||
* @param ttl Time to live in milliseconds
|
||||
*/
|
||||
async set(key: string, value: unknown, ttl?: number) {
|
||||
if (!this.cache) await this.init();
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ import type {
|
||||
WorkflowExecuteMode,
|
||||
ExecutionStatus,
|
||||
ExecutionError,
|
||||
IExecuteFunctions,
|
||||
ITaskDataConnections,
|
||||
ExecuteWorkflowOptions,
|
||||
IWorkflowExecutionDataProcess,
|
||||
} from 'n8n-workflow';
|
||||
@@ -64,6 +66,7 @@ import {
|
||||
} from './execution-lifecycle-hooks/shared/shared-hook-functions';
|
||||
import { toSaveSettings } from './execution-lifecycle-hooks/to-save-settings';
|
||||
import { Logger } from './logging/logger.service';
|
||||
import { TaskManager } from './runners/task-managers/task-manager';
|
||||
import { SecretsHelper } from './secrets-helpers';
|
||||
import { OwnershipService } from './services/ownership.service';
|
||||
import { UrlService } from './services/url.service';
|
||||
@@ -984,6 +987,47 @@ export async function getBase(
|
||||
setExecutionStatus,
|
||||
variables,
|
||||
secretsHelpers: Container.get(SecretsHelper),
|
||||
async startAgentJob(
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
jobType: string,
|
||||
settings: unknown,
|
||||
executeFunctions: IExecuteFunctions,
|
||||
inputData: ITaskDataConnections,
|
||||
node: INode,
|
||||
workflow: Workflow,
|
||||
runExecutionData: IRunExecutionData,
|
||||
runIndex: number,
|
||||
itemIndex: number,
|
||||
activeNodeName: string,
|
||||
connectionInputData: INodeExecutionData[],
|
||||
siblingParameters: INodeParameters,
|
||||
mode: WorkflowExecuteMode,
|
||||
executeData?: IExecuteData,
|
||||
defaultReturnRunIndex?: number,
|
||||
selfData?: IDataObject,
|
||||
contextNodeName?: string,
|
||||
) {
|
||||
return await Container.get(TaskManager).startTask(
|
||||
additionalData,
|
||||
jobType,
|
||||
settings,
|
||||
executeFunctions,
|
||||
inputData,
|
||||
node,
|
||||
workflow,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
siblingParameters,
|
||||
mode,
|
||||
executeData,
|
||||
defaultReturnRunIndex,
|
||||
selfData,
|
||||
contextNodeName,
|
||||
);
|
||||
},
|
||||
logAiEvent: (eventName: keyof AiEventMap, payload: AiEventPayload) =>
|
||||
eventService.emit(eventName, payload),
|
||||
};
|
||||
|
||||
@@ -245,7 +245,7 @@ export class WorkflowRunner {
|
||||
{ executionId },
|
||||
);
|
||||
let workflowExecution: PCancelable<IRun>;
|
||||
await this.executionRepository.updateStatus(executionId, 'running');
|
||||
await this.executionRepository.updateStatus(executionId, 'running'); // write
|
||||
|
||||
try {
|
||||
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
|
||||
|
||||
58
packages/core/src/Agent/index.ts
Normal file
58
packages/core/src/Agent/index.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import type {
|
||||
IExecuteFunctions,
|
||||
Workflow,
|
||||
IRunExecutionData,
|
||||
INodeExecutionData,
|
||||
ITaskDataConnections,
|
||||
INode,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
WorkflowExecuteMode,
|
||||
INodeParameters,
|
||||
IExecuteData,
|
||||
IDataObject,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
export const createAgentStartJob = (
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
inputData: ITaskDataConnections,
|
||||
node: INode,
|
||||
workflow: Workflow,
|
||||
runExecutionData: IRunExecutionData,
|
||||
runIndex: number,
|
||||
activeNodeName: string,
|
||||
connectionInputData: INodeExecutionData[],
|
||||
siblingParameters: INodeParameters,
|
||||
mode: WorkflowExecuteMode,
|
||||
executeData?: IExecuteData,
|
||||
defaultReturnRunIndex?: number,
|
||||
selfData?: IDataObject,
|
||||
contextNodeName?: string,
|
||||
): IExecuteFunctions['startJob'] => {
|
||||
return async function startJob<T = unknown>(
|
||||
this: IExecuteFunctions,
|
||||
jobType: string,
|
||||
settings: unknown,
|
||||
itemIndex: number,
|
||||
): Promise<T> {
|
||||
return await additionalData.startAgentJob<T>(
|
||||
additionalData,
|
||||
jobType,
|
||||
settings,
|
||||
this,
|
||||
inputData,
|
||||
node,
|
||||
workflow,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
siblingParameters,
|
||||
mode,
|
||||
executeData,
|
||||
defaultReturnRunIndex,
|
||||
selfData,
|
||||
contextNodeName,
|
||||
);
|
||||
};
|
||||
};
|
||||
@@ -132,6 +132,7 @@ import { Readable } from 'stream';
|
||||
import Container from 'typedi';
|
||||
import url, { URL, URLSearchParams } from 'url';
|
||||
|
||||
import { createAgentStartJob } from './Agent';
|
||||
import { BinaryDataService } from './BinaryData/BinaryData.service';
|
||||
import type { BinaryData } from './BinaryData/types';
|
||||
import { binaryToBuffer } from './BinaryData/utils';
|
||||
@@ -3788,6 +3789,17 @@ export function getExecuteFunctions(
|
||||
additionalData.setExecutionStatus('waiting');
|
||||
}
|
||||
},
|
||||
logNodeOutput(...args: unknown[]): void {
|
||||
if (mode === 'manual') {
|
||||
// @ts-expect-error `args` is spreadable
|
||||
this.sendMessageToUI(...args);
|
||||
return;
|
||||
}
|
||||
|
||||
if (process.env.CODE_ENABLE_STDOUT === 'true') {
|
||||
console.log(`[Workflow "${this.getWorkflow().id}"][Node "${node.name}"]`, ...args);
|
||||
}
|
||||
},
|
||||
sendMessageToUI(...args: any[]): void {
|
||||
if (mode !== 'manual') {
|
||||
return;
|
||||
@@ -3905,6 +3917,19 @@ export function getExecuteFunctions(
|
||||
});
|
||||
},
|
||||
getParentCallbackManager: () => additionalData.parentCallbackManager,
|
||||
startJob: createAgentStartJob(
|
||||
additionalData,
|
||||
inputData,
|
||||
node,
|
||||
workflow,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
node.name,
|
||||
connectionInputData,
|
||||
{},
|
||||
mode,
|
||||
executeData,
|
||||
),
|
||||
};
|
||||
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
|
||||
}
|
||||
|
||||
@@ -8,6 +8,9 @@ import {
|
||||
type INodeTypeDescription,
|
||||
} from 'n8n-workflow';
|
||||
import set from 'lodash/set';
|
||||
import Container from 'typedi';
|
||||
import { TaskRunnersConfig } from '@n8n/config';
|
||||
|
||||
import { javascriptCodeDescription } from './descriptions/JavascriptCodeDescription';
|
||||
import { pythonCodeDescription } from './descriptions/PythonCodeDescription';
|
||||
import { JavaScriptSandbox } from './JavaScriptSandbox';
|
||||
@@ -92,6 +95,8 @@ export class Code implements INodeType {
|
||||
};
|
||||
|
||||
async execute(this: IExecuteFunctions) {
|
||||
const runnersConfig = Container.get(TaskRunnersConfig);
|
||||
|
||||
const nodeMode = this.getNodeParameter('mode', 0) as CodeExecutionMode;
|
||||
const workflowMode = this.getMode();
|
||||
|
||||
@@ -102,6 +107,22 @@ export class Code implements INodeType {
|
||||
: 'javaScript';
|
||||
const codeParameterName = language === 'python' ? 'pythonCode' : 'jsCode';
|
||||
|
||||
if (!runnersConfig.disabled && language === 'javaScript') {
|
||||
// TODO: once per item
|
||||
const code = this.getNodeParameter(codeParameterName, 0) as string;
|
||||
const items = await this.startJob<INodeExecutionData[]>(
|
||||
{ javaScript: 'javascript', python: 'python' }[language] ?? language,
|
||||
{
|
||||
code,
|
||||
nodeMode,
|
||||
workflowMode,
|
||||
},
|
||||
0,
|
||||
);
|
||||
|
||||
return [items];
|
||||
}
|
||||
|
||||
const getSandbox = (index = 0) => {
|
||||
const code = this.getNodeParameter(codeParameterName, index) as string;
|
||||
const context = getSandboxContext.call(this, index);
|
||||
|
||||
@@ -837,6 +837,7 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@kafkajs/confluent-schema-registry": "1.0.6",
|
||||
"@n8n/config": "workspace:*",
|
||||
"@n8n/imap": "workspace:*",
|
||||
"@n8n/vm2": "3.9.25",
|
||||
"amqplib": "0.10.3",
|
||||
|
||||
@@ -952,6 +952,8 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
|
||||
};
|
||||
|
||||
getParentCallbackManager(): CallbackManager | undefined;
|
||||
|
||||
startJob<T = unknown>(jobType: string, settings: unknown, itemIndex: number): Promise<T>;
|
||||
};
|
||||
|
||||
export interface IExecuteSingleFunctions extends BaseExecutionFunctions {
|
||||
@@ -2239,6 +2241,26 @@ export interface IWorkflowExecuteAdditionalData {
|
||||
secretsHelpers: SecretsHelpersBase;
|
||||
logAiEvent: (eventName: AiEvent, payload: AiEventPayload) => void;
|
||||
parentCallbackManager?: CallbackManager;
|
||||
startAgentJob<T>(
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
jobType: string,
|
||||
settings: unknown,
|
||||
executeFunctions: IExecuteFunctions,
|
||||
inputData: ITaskDataConnections,
|
||||
node: INode,
|
||||
workflow: Workflow,
|
||||
runExecutionData: IRunExecutionData,
|
||||
runIndex: number,
|
||||
itemIndex: number,
|
||||
activeNodeName: string,
|
||||
connectionInputData: INodeExecutionData[],
|
||||
siblingParameters: INodeParameters,
|
||||
mode: WorkflowExecuteMode,
|
||||
executeData?: IExecuteData,
|
||||
defaultReturnRunIndex?: number,
|
||||
selfData?: IDataObject,
|
||||
contextNodeName?: string,
|
||||
): Promise<T>;
|
||||
}
|
||||
|
||||
export type WorkflowExecuteMode =
|
||||
|
||||
579
pnpm-lock.yaml
generated
579
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user