mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-21 11:49:59 +00:00
fix(core): Add cross-origin checks on push endpoints (#14365)
This commit is contained in:
committed by
GitHub
parent
4110f3188e
commit
178628a59b
@@ -183,15 +183,6 @@ export const schema = {
|
|||||||
env: 'EXTERNAL_FRONTEND_HOOKS_URLS',
|
env: 'EXTERNAL_FRONTEND_HOOKS_URLS',
|
||||||
},
|
},
|
||||||
|
|
||||||
push: {
|
|
||||||
backend: {
|
|
||||||
format: ['sse', 'websocket'] as const,
|
|
||||||
default: 'websocket',
|
|
||||||
env: 'N8N_PUSH_BACKEND',
|
|
||||||
doc: 'Backend to use for push notifications',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
binaryDataManager: {
|
binaryDataManager: {
|
||||||
availableModes: {
|
availableModes: {
|
||||||
format: 'comma-separated-list',
|
format: 'comma-separated-list',
|
||||||
|
|||||||
@@ -1,43 +1,195 @@
|
|||||||
import { mock } from 'jest-mock-extended';
|
import type { Application } from 'express';
|
||||||
import type { WebSocket } from 'ws';
|
import { captor, mock } from 'jest-mock-extended';
|
||||||
|
import type { Server, ServerResponse } from 'node:http';
|
||||||
|
import type { Socket } from 'node:net';
|
||||||
|
import { type WebSocket, Server as WSServer } from 'ws';
|
||||||
|
|
||||||
import config from '@/config';
|
|
||||||
import type { User } from '@/databases/entities/user';
|
import type { User } from '@/databases/entities/user';
|
||||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { SSEPush } from '@/push/sse.push';
|
import { SSEPush } from '@/push/sse.push';
|
||||||
import type { WebSocketPushRequest, SSEPushRequest } from '@/push/types';
|
import type { WebSocketPushRequest, SSEPushRequest, PushResponse } from '@/push/types';
|
||||||
import { WebSocketPush } from '@/push/websocket.push';
|
import { WebSocketPush } from '@/push/websocket.push';
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance } from '@test/mocking';
|
||||||
|
|
||||||
|
import type { PushConfig } from '../push.config';
|
||||||
|
|
||||||
|
jest.mock('ws', () => ({
|
||||||
|
Server: jest.fn(),
|
||||||
|
}));
|
||||||
jest.unmock('@/push');
|
jest.unmock('@/push');
|
||||||
|
jest.mock('@/constants', () => ({
|
||||||
|
inProduction: true,
|
||||||
|
}));
|
||||||
|
|
||||||
describe('Push', () => {
|
describe('Push', () => {
|
||||||
const user = mock<User>();
|
const pushRef = 'valid-push-ref';
|
||||||
|
const host = 'example.com';
|
||||||
|
const user = mock<User>({ id: 'user-id' });
|
||||||
|
const config = mock<PushConfig>();
|
||||||
|
|
||||||
|
let push: Push;
|
||||||
const sseBackend = mockInstance(SSEPush);
|
const sseBackend = mockInstance(SSEPush);
|
||||||
const wsBackend = mockInstance(WebSocketPush);
|
const wsBackend = mockInstance(WebSocketPush);
|
||||||
|
|
||||||
test('should validate pushRef on requests for websocket backend', () => {
|
beforeEach(() => jest.resetAllMocks());
|
||||||
config.set('push.backend', 'websocket');
|
|
||||||
const push = new Push(mock(), mock(), mock());
|
|
||||||
const ws = mock<WebSocket>();
|
|
||||||
const request = mock<WebSocketPushRequest>({ user, ws });
|
|
||||||
request.query = { pushRef: '' };
|
|
||||||
push.handleRequest(request, mock());
|
|
||||||
|
|
||||||
expect(ws.send).toHaveBeenCalled();
|
describe('setupPushServer', () => {
|
||||||
expect(ws.close).toHaveBeenCalledWith(1008);
|
const restEndpoint = 'rest';
|
||||||
expect(wsBackend.add).not.toHaveBeenCalled();
|
const app = mock<Application>();
|
||||||
|
const server = mock<Server>();
|
||||||
|
// @ts-expect-error `jest.spyOn` typings don't allow `constructor`
|
||||||
|
const wssSpy = jest.spyOn(WSServer.prototype, 'constructor') as jest.SpyInstance<WSServer>;
|
||||||
|
|
||||||
|
describe('sse backend', () => {
|
||||||
|
test('should not create a WebSocket server', () => {
|
||||||
|
config.backend = 'sse';
|
||||||
|
push = new Push(config, mock(), mock(), mock(), mock());
|
||||||
|
|
||||||
|
push.setupPushServer(restEndpoint, server, app);
|
||||||
|
|
||||||
|
expect(wssSpy).not.toHaveBeenCalled();
|
||||||
|
expect(server.on).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('websocket backend', () => {
|
||||||
|
let onUpgrade: (request: WebSocketPushRequest, socket: Socket, head: Buffer) => void;
|
||||||
|
const wsServer = mock<WSServer>();
|
||||||
|
const socket = mock<Socket>();
|
||||||
|
const upgradeHead = mock<Buffer>();
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
config.backend = 'websocket';
|
||||||
|
push = new Push(config, mock(), mock(), mock(), mock());
|
||||||
|
wssSpy.mockReturnValue(wsServer);
|
||||||
|
|
||||||
|
push.setupPushServer(restEndpoint, server, app);
|
||||||
|
|
||||||
|
expect(wssSpy).toHaveBeenCalledWith({ noServer: true });
|
||||||
|
const onUpgradeCaptor = captor<typeof onUpgrade>();
|
||||||
|
expect(server.on).toHaveBeenCalledWith('upgrade', onUpgradeCaptor);
|
||||||
|
onUpgrade = onUpgradeCaptor.value;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should not upgrade non-push urls', () => {
|
||||||
|
const request = mock<WebSocketPushRequest>({ url: '/rest/testing' });
|
||||||
|
|
||||||
|
onUpgrade(request, socket, upgradeHead);
|
||||||
|
|
||||||
|
expect(wsServer.handleUpgrade).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should upgrade push url, and route it to express', () => {
|
||||||
|
const request = mock<WebSocketPushRequest>({ url: '/rest/push' });
|
||||||
|
|
||||||
|
onUpgrade(request, socket, upgradeHead);
|
||||||
|
|
||||||
|
const handleUpgradeCaptor = captor<(ws: WebSocket) => void>();
|
||||||
|
expect(wsServer.handleUpgrade).toHaveBeenCalledWith(
|
||||||
|
request,
|
||||||
|
socket,
|
||||||
|
upgradeHead,
|
||||||
|
handleUpgradeCaptor,
|
||||||
|
);
|
||||||
|
|
||||||
|
const ws = mock<WebSocket>();
|
||||||
|
handleUpgradeCaptor.value(ws);
|
||||||
|
|
||||||
|
expect(request.ws).toBe(ws);
|
||||||
|
|
||||||
|
const serverResponseCaptor = captor<ServerResponse>();
|
||||||
|
// @ts-expect-error `handle` isn't documented
|
||||||
|
expect(app.handle).toHaveBeenCalledWith(request, serverResponseCaptor);
|
||||||
|
|
||||||
|
serverResponseCaptor.value.writeHead(200);
|
||||||
|
expect(ws.close).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
serverResponseCaptor.value.writeHead(404);
|
||||||
|
expect(ws.close).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
test('should validate pushRef on requests for SSE backend', () => {
|
describe('handleRequest', () => {
|
||||||
config.set('push.backend', 'sse');
|
const req = mock<SSEPushRequest | WebSocketPushRequest>({ user });
|
||||||
const push = new Push(mock(), mock(), mock());
|
const res = mock<PushResponse>();
|
||||||
const request = mock<SSEPushRequest>({ user, ws: undefined });
|
const ws = mock<WebSocket>();
|
||||||
request.query = { pushRef: '' };
|
const backendNames = ['sse', 'websocket'] as const;
|
||||||
expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError);
|
|
||||||
|
|
||||||
expect(sseBackend.add).not.toHaveBeenCalled();
|
beforeEach(() => {
|
||||||
|
res.status.mockReturnThis();
|
||||||
|
|
||||||
|
req.headers.host = host;
|
||||||
|
req.headers.origin = `https://${host}`;
|
||||||
|
req.query = { pushRef };
|
||||||
|
});
|
||||||
|
|
||||||
|
describe.each(backendNames)('%s backend', (backendName) => {
|
||||||
|
const backend = backendName === 'sse' ? sseBackend : wsBackend;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
config.backend = backendName;
|
||||||
|
push = new Push(config, mock(), mock(), mock(), mock());
|
||||||
|
req.ws = backendName === 'sse' ? undefined : ws;
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('should throw on invalid origin', () => {
|
||||||
|
test.each(['https://subdomain.example.com', 'https://123example.com', undefined])(
|
||||||
|
'%s',
|
||||||
|
(origin) => {
|
||||||
|
req.headers.origin = origin;
|
||||||
|
|
||||||
|
if (backendName === 'sse') {
|
||||||
|
expect(() => push.handleRequest(req, res)).toThrow(
|
||||||
|
new BadRequestError('Invalid origin!'),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
push.handleRequest(req, res);
|
||||||
|
expect(ws.send).toHaveBeenCalledWith('Invalid origin!');
|
||||||
|
expect(ws.close).toHaveBeenCalledWith(1008);
|
||||||
|
}
|
||||||
|
expect(backend.add).not.toHaveBeenCalled();
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should throw if pushRef is invalid', () => {
|
||||||
|
req.query = { pushRef: '' };
|
||||||
|
|
||||||
|
if (backendName === 'sse') {
|
||||||
|
expect(() => push.handleRequest(req, res)).toThrow(
|
||||||
|
new BadRequestError('The query parameter "pushRef" is missing!'),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
push.handleRequest(req, mock());
|
||||||
|
expect(ws.send).toHaveBeenCalled();
|
||||||
|
expect(ws.close).toHaveBeenCalledWith(1008);
|
||||||
|
}
|
||||||
|
expect(backend.add).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should add the connection if pushRef is valid', () => {
|
||||||
|
const emitSpy = jest.spyOn(push, 'emit');
|
||||||
|
|
||||||
|
push.handleRequest(req, res);
|
||||||
|
|
||||||
|
const connection = backendName === 'sse' ? { req, res } : ws;
|
||||||
|
expect(backend.add).toHaveBeenCalledWith(pushRef, user.id, connection);
|
||||||
|
expect(emitSpy).toHaveBeenCalledWith('editorUiConnected', pushRef);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (backendName === 'websocket') {
|
||||||
|
test('should respond with 401 if request is not WebSocket', () => {
|
||||||
|
req.ws = undefined;
|
||||||
|
|
||||||
|
push.handleRequest(req, res);
|
||||||
|
|
||||||
|
expect(res.status).toHaveBeenCalledWith(401);
|
||||||
|
expect(res.send).toHaveBeenCalledWith('Unauthorized');
|
||||||
|
expect(backend.add).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
116
packages/cli/src/push/__tests__/sse.push.test.ts
Normal file
116
packages/cli/src/push/__tests__/sse.push.test.ts
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
import { type PushMessage } from '@n8n/api-types';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import EventEmitter from 'node:events';
|
||||||
|
|
||||||
|
import { SSEPush } from '@/push/sse.push';
|
||||||
|
import type { PushRequest, PushResponse } from '@/push/types';
|
||||||
|
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
const createMockConnection = () => {
|
||||||
|
const req = mock(new EventEmitter() as PushRequest);
|
||||||
|
req.socket = mock();
|
||||||
|
const res = mock(new EventEmitter() as PushResponse);
|
||||||
|
return { req, res };
|
||||||
|
};
|
||||||
|
|
||||||
|
describe('SSEPush', () => {
|
||||||
|
const userId = 'test-user';
|
||||||
|
const executionId = 'test-execution-id';
|
||||||
|
|
||||||
|
const pushRef = 'push-ref';
|
||||||
|
const connection = createMockConnection();
|
||||||
|
const { req, res } = connection;
|
||||||
|
|
||||||
|
const pushRef2 = 'push-ref-2';
|
||||||
|
const connection2 = createMockConnection();
|
||||||
|
|
||||||
|
const pushMessage: PushMessage = { type: 'executionRecovered', data: { executionId } };
|
||||||
|
const expectedMsg = JSON.stringify(pushMessage);
|
||||||
|
|
||||||
|
let ssePush: SSEPush;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.resetAllMocks();
|
||||||
|
ssePush = new SSEPush(mock(), mock());
|
||||||
|
ssePush.add(pushRef, userId, connection);
|
||||||
|
ssePush.add(pushRef2, userId, connection2);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('add', () => {
|
||||||
|
it('adds a connection', () => {
|
||||||
|
expect(req.socket.setTimeout).toHaveBeenCalledWith(0);
|
||||||
|
expect(req.socket.setNoDelay).toHaveBeenCalledWith(true);
|
||||||
|
expect(req.socket.setKeepAlive).toHaveBeenCalledWith(true);
|
||||||
|
|
||||||
|
expect(res.setHeader).toHaveBeenCalledWith(
|
||||||
|
'Content-Type',
|
||||||
|
'text/event-stream; charset=UTF-8',
|
||||||
|
);
|
||||||
|
expect(res.setHeader).toHaveBeenCalledWith('Cache-Control', 'no-cache');
|
||||||
|
expect(res.setHeader).toHaveBeenCalledWith('Connection', 'keep-alive');
|
||||||
|
expect(res.writeHead).toHaveBeenCalledWith(200);
|
||||||
|
|
||||||
|
expect(res.write).toHaveBeenCalledWith(':ok\n\n');
|
||||||
|
expect(res.flush).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('closes a connection', () => {
|
||||||
|
test.each([
|
||||||
|
['end', req],
|
||||||
|
['close', req],
|
||||||
|
['finish', res],
|
||||||
|
])('on "%s" event', (event, emitter) => {
|
||||||
|
expect(ssePush.hasPushRef(pushRef)).toBe(true);
|
||||||
|
emitter.emit(event);
|
||||||
|
expect(ssePush.hasPushRef(pushRef)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('sends data', () => {
|
||||||
|
beforeEach(() => jest.clearAllMocks());
|
||||||
|
|
||||||
|
it('to one connection', () => {
|
||||||
|
ssePush.sendToOne(pushMessage, pushRef);
|
||||||
|
|
||||||
|
expect(connection.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
|
||||||
|
expect(connection.res.flush).toHaveBeenCalled();
|
||||||
|
expect(connection2.res.write).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('to all connections', () => {
|
||||||
|
ssePush.sendToAll(pushMessage);
|
||||||
|
|
||||||
|
expect(connection.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
|
||||||
|
expect(connection.res.flush).toHaveBeenCalled();
|
||||||
|
expect(connection2.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
|
||||||
|
expect(connection2.res.flush).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('to a specific user', () => {
|
||||||
|
ssePush.sendToUsers(pushMessage, [userId]);
|
||||||
|
|
||||||
|
expect(connection.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
|
||||||
|
expect(connection.res.flush).toHaveBeenCalled();
|
||||||
|
expect(connection2.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
|
||||||
|
expect(connection2.res.flush).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('pings all connections', () => {
|
||||||
|
jest.runOnlyPendingTimers();
|
||||||
|
|
||||||
|
expect(connection.res.write).toHaveBeenCalledWith(':ping\n\n');
|
||||||
|
expect(connection.res.flush).toHaveBeenCalled();
|
||||||
|
expect(connection2.res.write).toHaveBeenCalledWith(':ping\n\n');
|
||||||
|
expect(connection2.res.flush).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('closes all connections', () => {
|
||||||
|
ssePush.closeAllConnections();
|
||||||
|
|
||||||
|
expect(connection.res.end).toHaveBeenCalled();
|
||||||
|
expect(connection2.res.end).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -5,19 +5,18 @@ import { ServerResponse } from 'http';
|
|||||||
import type { Server } from 'http';
|
import type { Server } from 'http';
|
||||||
import { InstanceSettings, Logger } from 'n8n-core';
|
import { InstanceSettings, Logger } from 'n8n-core';
|
||||||
import { deepCopy } from 'n8n-workflow';
|
import { deepCopy } from 'n8n-workflow';
|
||||||
import type { Socket } from 'net';
|
|
||||||
import { parse as parseUrl } from 'url';
|
import { parse as parseUrl } from 'url';
|
||||||
import { Server as WSServer } from 'ws';
|
import { Server as WSServer } from 'ws';
|
||||||
|
|
||||||
import { AuthService } from '@/auth/auth.service';
|
import { AuthService } from '@/auth/auth.service';
|
||||||
import config from '@/config';
|
import { inProduction, TRIMMED_TASK_DATA_CONNECTIONS } from '@/constants';
|
||||||
import { TRIMMED_TASK_DATA_CONNECTIONS } from '@/constants';
|
|
||||||
import type { User } from '@/databases/entities/user';
|
import type { User } from '@/databases/entities/user';
|
||||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||||
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
||||||
import { TypedEmitter } from '@/typed-emitter';
|
import { TypedEmitter } from '@/typed-emitter';
|
||||||
|
|
||||||
|
import { PushConfig } from './push.config';
|
||||||
import { SSEPush } from './sse.push';
|
import { SSEPush } from './sse.push';
|
||||||
import type { OnPushMessage, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
|
import type { OnPushMessage, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
|
||||||
import { WebSocketPush } from './websocket.push';
|
import { WebSocketPush } from './websocket.push';
|
||||||
@@ -27,8 +26,6 @@ type PushEvents = {
|
|||||||
message: OnPushMessage;
|
message: OnPushMessage;
|
||||||
};
|
};
|
||||||
|
|
||||||
const useWebSockets = config.getEnv('push.backend') === 'websocket';
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Max allowed size of a push message in bytes. Events going through the pubsub
|
* Max allowed size of a push message in bytes. Events going through the pubsub
|
||||||
* channel are trimmed if exceeding this size.
|
* channel are trimmed if exceeding this size.
|
||||||
@@ -44,44 +41,94 @@ const MAX_PAYLOAD_SIZE_BYTES = 5 * 1024 * 1024; // 5 MiB
|
|||||||
*/
|
*/
|
||||||
@Service()
|
@Service()
|
||||||
export class Push extends TypedEmitter<PushEvents> {
|
export class Push extends TypedEmitter<PushEvents> {
|
||||||
isBidirectional = useWebSockets;
|
private useWebSockets = this.config.backend === 'websocket';
|
||||||
|
|
||||||
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
isBidirectional = this.useWebSockets;
|
||||||
|
|
||||||
|
private backend = this.useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
|
private readonly config: PushConfig,
|
||||||
private readonly instanceSettings: InstanceSettings,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
private readonly publisher: Publisher,
|
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
|
private readonly authService: AuthService,
|
||||||
|
private readonly publisher: Publisher,
|
||||||
) {
|
) {
|
||||||
super();
|
super();
|
||||||
this.logger = this.logger.scoped('push');
|
this.logger = this.logger.scoped('push');
|
||||||
|
|
||||||
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
|
if (this.useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
getBackend() {
|
getBackend() {
|
||||||
return this.backend;
|
return this.backend;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Sets up the main express app to upgrade websocket connections */
|
||||||
|
setupPushServer(restEndpoint: string, server: Server, app: Application) {
|
||||||
|
if (this.useWebSockets) {
|
||||||
|
const wsServer = new WSServer({ noServer: true });
|
||||||
|
server.on('upgrade', (request: WebSocketPushRequest, socket, upgradeHead) => {
|
||||||
|
if (parseUrl(request.url).pathname === `/${restEndpoint}/push`) {
|
||||||
|
wsServer.handleUpgrade(request, socket, upgradeHead, (ws) => {
|
||||||
|
request.ws = ws;
|
||||||
|
|
||||||
|
const response = new ServerResponse(request);
|
||||||
|
response.writeHead = (statusCode) => {
|
||||||
|
if (statusCode > 200) ws.close();
|
||||||
|
return response;
|
||||||
|
};
|
||||||
|
|
||||||
|
// @ts-expect-error `handle` isn't documented
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
||||||
|
app.handle(request, response);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Sets up the push endppoint that the frontend connects to. */
|
||||||
|
setupPushHandler(restEndpoint: string, app: Application) {
|
||||||
|
app.use(
|
||||||
|
`/${restEndpoint}/push`,
|
||||||
|
// eslint-disable-next-line @typescript-eslint/unbound-method
|
||||||
|
this.authService.authMiddleware,
|
||||||
|
(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) =>
|
||||||
|
this.handleRequest(req, res),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
|
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
|
||||||
const {
|
const {
|
||||||
ws,
|
ws,
|
||||||
query: { pushRef },
|
query: { pushRef },
|
||||||
user,
|
user,
|
||||||
|
headers,
|
||||||
} = req;
|
} = req;
|
||||||
|
|
||||||
|
let connectionError = '';
|
||||||
if (!pushRef) {
|
if (!pushRef) {
|
||||||
|
connectionError = 'The query parameter "pushRef" is missing!';
|
||||||
|
} else if (
|
||||||
|
inProduction &&
|
||||||
|
!(headers.origin === `http://${headers.host}` || headers.origin === `https://${headers.host}`)
|
||||||
|
) {
|
||||||
|
connectionError = 'Invalid origin!';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connectionError) {
|
||||||
if (ws) {
|
if (ws) {
|
||||||
ws.send('The query parameter "pushRef" is missing!');
|
ws.send(connectionError);
|
||||||
ws.close(1008);
|
ws.close(1008);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw new BadRequestError('The query parameter "pushRef" is missing!');
|
throw new BadRequestError(connectionError);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.ws) {
|
if (req.ws) {
|
||||||
(this.backend as WebSocketPush).add(pushRef, user.id, req.ws);
|
(this.backend as WebSocketPush).add(pushRef, user.id, req.ws);
|
||||||
} else if (!useWebSockets) {
|
} else if (!this.useWebSockets) {
|
||||||
(this.backend as SSEPush).add(pushRef, user.id, { req, res });
|
(this.backend as SSEPush).add(pushRef, user.id, { req, res });
|
||||||
} else {
|
} else {
|
||||||
res.status(401).send('Unauthorized');
|
res.status(401).send('Unauthorized');
|
||||||
@@ -182,38 +229,3 @@ export class Push extends TypedEmitter<PushEvents> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {
|
|
||||||
if (useWebSockets) {
|
|
||||||
const wsServer = new WSServer({ noServer: true });
|
|
||||||
server.on('upgrade', (request: WebSocketPushRequest, socket: Socket, head) => {
|
|
||||||
if (parseUrl(request.url).pathname === `/${restEndpoint}/push`) {
|
|
||||||
wsServer.handleUpgrade(request, socket, head, (ws) => {
|
|
||||||
request.ws = ws;
|
|
||||||
|
|
||||||
const response = new ServerResponse(request);
|
|
||||||
response.writeHead = (statusCode) => {
|
|
||||||
if (statusCode > 200) ws.close();
|
|
||||||
return response;
|
|
||||||
};
|
|
||||||
|
|
||||||
// @ts-ignore
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
|
||||||
app.handle(request, response);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const setupPushHandler = (restEndpoint: string, app: Application) => {
|
|
||||||
const endpoint = `/${restEndpoint}/push`;
|
|
||||||
const push = Container.get(Push);
|
|
||||||
const authService = Container.get(AuthService);
|
|
||||||
app.use(
|
|
||||||
endpoint,
|
|
||||||
// eslint-disable-next-line @typescript-eslint/unbound-method
|
|
||||||
authService.authMiddleware,
|
|
||||||
(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) => push.handleRequest(req, res),
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|||||||
8
packages/cli/src/push/push.config.ts
Normal file
8
packages/cli/src/push/push.config.ts
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
import { Config, Env } from '@n8n/config';
|
||||||
|
|
||||||
|
@Config
|
||||||
|
export class PushConfig {
|
||||||
|
/** Backend to use for push notifications */
|
||||||
|
@Env('N8N_PUSH_BACKEND')
|
||||||
|
backend: 'sse' | 'websocket' = 'websocket';
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
import type { Response } from 'express';
|
import type { Request, Response } from 'express';
|
||||||
import type { WebSocket } from 'ws';
|
import type { WebSocket } from 'ws';
|
||||||
|
|
||||||
import type { User } from '@/databases/entities/user';
|
import type { User } from '@/databases/entities/user';
|
||||||
@@ -9,7 +9,10 @@ import type { AuthenticatedRequest } from '@/requests';
|
|||||||
export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>;
|
export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>;
|
||||||
|
|
||||||
export type SSEPushRequest = PushRequest & { ws: undefined };
|
export type SSEPushRequest = PushRequest & { ws: undefined };
|
||||||
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
|
export type WebSocketPushRequest = PushRequest & {
|
||||||
|
ws: WebSocket;
|
||||||
|
headers: Request['headers'];
|
||||||
|
};
|
||||||
|
|
||||||
export type PushResponse = Response & {
|
export type PushResponse = Response & {
|
||||||
req: PushRequest;
|
req: PushRequest;
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
|||||||
import { handleMfaDisable, isMfaFeatureEnabled } from '@/mfa/helpers';
|
import { handleMfaDisable, isMfaFeatureEnabled } from '@/mfa/helpers';
|
||||||
import { PostHogClient } from '@/posthog';
|
import { PostHogClient } from '@/posthog';
|
||||||
import { isApiEnabled, loadPublicApiVersions } from '@/public-api';
|
import { isApiEnabled, loadPublicApiVersions } from '@/public-api';
|
||||||
import { setupPushServer, setupPushHandler, Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import type { APIRequest } from '@/requests';
|
import type { APIRequest } from '@/requests';
|
||||||
import * as ResponseHelper from '@/response-helper';
|
import * as ResponseHelper from '@/response-helper';
|
||||||
import type { FrontendService } from '@/services/frontend.service';
|
import type { FrontendService } from '@/services/frontend.service';
|
||||||
@@ -207,9 +207,10 @@ export class Server extends AbstractServer {
|
|||||||
this.app.use(cookieParser());
|
this.app.use(cookieParser());
|
||||||
|
|
||||||
const { restEndpoint, app } = this;
|
const { restEndpoint, app } = this;
|
||||||
setupPushHandler(restEndpoint, app);
|
|
||||||
|
|
||||||
const push = Container.get(Push);
|
const push = Container.get(Push);
|
||||||
|
push.setupPushHandler(restEndpoint, app);
|
||||||
|
|
||||||
if (push.isBidirectional) {
|
if (push.isBidirectional) {
|
||||||
const { CollaborationService } = await import('@/collaboration/collaboration.service');
|
const { CollaborationService } = await import('@/collaboration/collaboration.service');
|
||||||
|
|
||||||
@@ -443,6 +444,6 @@ export class Server extends AbstractServer {
|
|||||||
|
|
||||||
protected setupPushServer(): void {
|
protected setupPushServer(): void {
|
||||||
const { restEndpoint, server, app } = this;
|
const { restEndpoint, server, app } = this;
|
||||||
setupPushServer(restEndpoint, server, app);
|
Container.get(Push).setupPushServer(restEndpoint, server, app);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import { License } from '@/license';
|
|||||||
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
||||||
import { ModulesConfig } from '@/modules/modules.config';
|
import { ModulesConfig } from '@/modules/modules.config';
|
||||||
import { isApiEnabled } from '@/public-api';
|
import { isApiEnabled } from '@/public-api';
|
||||||
|
import { PushConfig } from '@/push/push.config';
|
||||||
import type { CommunityPackagesService } from '@/services/community-packages.service';
|
import type { CommunityPackagesService } from '@/services/community-packages.service';
|
||||||
import { getSamlLoginLabel } from '@/sso.ee/saml/saml-helpers';
|
import { getSamlLoginLabel } from '@/sso.ee/saml/saml-helpers';
|
||||||
import { getCurrentAuthenticationMethod } from '@/sso.ee/sso-helpers';
|
import { getCurrentAuthenticationMethod } from '@/sso.ee/sso-helpers';
|
||||||
@@ -46,6 +47,7 @@ export class FrontendService {
|
|||||||
private readonly urlService: UrlService,
|
private readonly urlService: UrlService,
|
||||||
private readonly securityConfig: SecurityConfig,
|
private readonly securityConfig: SecurityConfig,
|
||||||
private readonly modulesConfig: ModulesConfig,
|
private readonly modulesConfig: ModulesConfig,
|
||||||
|
private readonly pushConfig: PushConfig,
|
||||||
) {
|
) {
|
||||||
loadNodesAndCredentials.addPostProcessor(async () => await this.generateTypes());
|
loadNodesAndCredentials.addPostProcessor(async () => await this.generateTypes());
|
||||||
void this.generateTypes();
|
void this.generateTypes();
|
||||||
@@ -165,7 +167,7 @@ export class FrontendService {
|
|||||||
host: this.globalConfig.templates.host,
|
host: this.globalConfig.templates.host,
|
||||||
},
|
},
|
||||||
executionMode: config.getEnv('executions.mode'),
|
executionMode: config.getEnv('executions.mode'),
|
||||||
pushBackend: config.getEnv('push.backend'),
|
pushBackend: this.pushConfig.backend,
|
||||||
communityNodesEnabled: this.globalConfig.nodes.communityPackages.enabled,
|
communityNodesEnabled: this.globalConfig.nodes.communityPackages.enabled,
|
||||||
deployment: {
|
deployment: {
|
||||||
type: config.getEnv('deployment.type'),
|
type: config.getEnv('deployment.type'),
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/wor
|
|||||||
import * as testDb from '@test-integration/test-db';
|
import * as testDb from '@test-integration/test-db';
|
||||||
|
|
||||||
describe('CollaborationService', () => {
|
describe('CollaborationService', () => {
|
||||||
mockInstance(Push, new Push(mock(), mock(), mock()));
|
mockInstance(Push, new Push(mock(), mock(), mock(), mock(), mock()));
|
||||||
let pushService: Push;
|
let pushService: Push;
|
||||||
let collaborationService: CollaborationService;
|
let collaborationService: CollaborationService;
|
||||||
let owner: User;
|
let owner: User;
|
||||||
|
|||||||
Reference in New Issue
Block a user