diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 0cdcac07df..26c5829df4 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -503,58 +503,74 @@ export type IPushData = | PushDataRemoveNodeType | PushDataTestWebhook | PushDataNodeDescriptionUpdated - | PushDataExecutionRecovered; + | PushDataExecutionRecovered + | PushDataActiveWorkflowUsersChanged; -type PushDataExecutionRecovered = { +type PushDataActiveWorkflowUsersChanged = { + data: IActiveWorkflowUsersChanged; + type: 'activeWorkflowUsersChanged'; +}; + +export type PushDataExecutionRecovered = { data: IPushDataExecutionRecovered; type: 'executionRecovered'; }; -type PushDataExecutionFinished = { +export type PushDataExecutionFinished = { data: IPushDataExecutionFinished; type: 'executionFinished'; }; -type PushDataExecutionStarted = { +export type PushDataExecutionStarted = { data: IPushDataExecutionStarted; type: 'executionStarted'; }; -type PushDataExecuteAfter = { +export type PushDataExecuteAfter = { data: IPushDataNodeExecuteAfter; type: 'nodeExecuteAfter'; }; -type PushDataExecuteBefore = { +export type PushDataExecuteBefore = { data: IPushDataNodeExecuteBefore; type: 'nodeExecuteBefore'; }; -type PushDataConsoleMessage = { +export type PushDataConsoleMessage = { data: IPushDataConsoleMessage; type: 'sendConsoleMessage'; }; -type PushDataReloadNodeType = { +export type PushDataReloadNodeType = { data: IPushDataReloadNodeType; type: 'reloadNodeType'; }; -type PushDataRemoveNodeType = { +export type PushDataRemoveNodeType = { data: IPushDataRemoveNodeType; type: 'removeNodeType'; }; -type PushDataTestWebhook = { +export type PushDataTestWebhook = { data: IPushDataTestWebhook; type: 'testWebhookDeleted' | 'testWebhookReceived'; }; -type PushDataNodeDescriptionUpdated = { +export type PushDataNodeDescriptionUpdated = { data: undefined; type: 'nodeDescriptionUpdated'; }; +export interface IActiveWorkflowUser { + user: User; + lastSeen: Date; +} + +export interface IActiveWorkflowUsersChanged { + workflowId: Workflow['id']; + activeUsers: IActiveWorkflowUser[]; +} + export interface IPushDataExecutionRecovered { executionId: string; } diff --git a/packages/cli/src/LoadNodesAndCredentials.ts b/packages/cli/src/LoadNodesAndCredentials.ts index 13931eab44..fa731f66bd 100644 --- a/packages/cli/src/LoadNodesAndCredentials.ts +++ b/packages/cli/src/LoadNodesAndCredentials.ts @@ -340,7 +340,7 @@ export class LoadNodesAndCredentials { loader.reset(); await loader.loadAll(); await this.postProcessLoaders(); - push.send('nodeDescriptionUpdated', undefined); + push.broadcast('nodeDescriptionUpdated'); }, 100); const toWatch = loader.isLazyLoaded diff --git a/packages/cli/src/collaboration/collaboration.message.ts b/packages/cli/src/collaboration/collaboration.message.ts new file mode 100644 index 0000000000..0980bdccbb --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.message.ts @@ -0,0 +1,23 @@ +export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage; + +export type WorkflowOpenedMessage = { + type: 'workflowOpened'; + workflowId: string; +}; + +export type WorkflowClosedMessage = { + type: 'workflowClosed'; + workflowId: string; +}; + +const isWorkflowMessage = (msg: unknown): msg is CollaborationMessage => { + return typeof msg === 'object' && msg !== null && 'type' in msg; +}; + +export const isWorkflowOpenedMessage = (msg: unknown): msg is WorkflowOpenedMessage => { + return isWorkflowMessage(msg) && msg.type === 'workflowOpened'; +}; + +export const isWorkflowClosedMessage = (msg: unknown): msg is WorkflowClosedMessage => { + return isWorkflowMessage(msg) && msg.type === 'workflowClosed'; +}; diff --git a/packages/cli/src/collaboration/collaboration.service.ts b/packages/cli/src/collaboration/collaboration.service.ts new file mode 100644 index 0000000000..4181995c0f --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.service.ts @@ -0,0 +1,87 @@ +import type { Workflow } from 'n8n-workflow'; +import { Service } from 'typedi'; +import { Push } from '../push'; +import { Logger } from '@/Logger'; +import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message'; +import { isWorkflowClosedMessage, isWorkflowOpenedMessage } from './collaboration.message'; +import { UserService } from '../services/user.service'; +import type { IActiveWorkflowUsersChanged } from '../Interfaces'; +import type { OnPushMessageEvent } from '@/push/types'; +import { CollaborationState } from '@/collaboration/collaboration.state'; + +/** + * Service for managing collaboration feature between users. E.g. keeping + * track of active users for a workflow. + */ +@Service() +export class CollaborationService { + constructor( + private readonly logger: Logger, + private readonly push: Push, + private readonly state: CollaborationState, + private readonly userService: UserService, + ) { + if (!push.isBidirectional) { + logger.warn( + 'Collaboration features are disabled because push is configured unidirectional. Use N8N_PUSH_BACKEND=websocket environment variable to enable them.', + ); + return; + } + + this.push.on('message', async (event: OnPushMessageEvent) => { + try { + await this.handleUserMessage(event.userId, event.msg); + } catch (error) { + this.logger.error('Error handling user message', { + error: error as unknown, + msg: event.msg, + userId: event.userId, + }); + } + }); + } + + async handleUserMessage(userId: string, msg: unknown) { + if (isWorkflowOpenedMessage(msg)) { + await this.handleWorkflowOpened(userId, msg); + } else if (isWorkflowClosedMessage(msg)) { + await this.handleWorkflowClosed(userId, msg); + } + } + + private async handleWorkflowOpened(userId: string, msg: WorkflowOpenedMessage) { + const { workflowId } = msg; + + this.state.addActiveWorkflowUser(workflowId, userId); + + await this.sendWorkflowUsersChangedMessage(workflowId); + } + + private async handleWorkflowClosed(userId: string, msg: WorkflowClosedMessage) { + const { workflowId } = msg; + + this.state.removeActiveWorkflowUser(workflowId, userId); + + await this.sendWorkflowUsersChangedMessage(workflowId); + } + + private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) { + const activeWorkflowUsers = this.state.getActiveWorkflowUsers(workflowId); + const workflowUserIds = activeWorkflowUsers.map((user) => user.userId); + + if (workflowUserIds.length === 0) { + return; + } + const users = await this.userService.getByIds(this.userService.getManager(), workflowUserIds); + + const msgData: IActiveWorkflowUsersChanged = { + workflowId, + activeUsers: users.map((user) => ({ + user, + lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!.lastSeen, + })), + }; + + this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds); + } +} diff --git a/packages/cli/src/collaboration/collaboration.state.ts b/packages/cli/src/collaboration/collaboration.state.ts new file mode 100644 index 0000000000..e5262ade12 --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.state.ts @@ -0,0 +1,62 @@ +import type { User } from '@/databases/entities/User'; +import type { Workflow } from 'n8n-workflow'; +import { Service } from 'typedi'; + +type ActiveWorkflowUser = { + userId: User['id']; + lastSeen: Date; +}; + +type UserStateByUserId = Map; + +type State = { + activeUsersByWorkflowId: Map; +}; + +/** + * State management for the collaboration service + */ +@Service() +export class CollaborationState { + private state: State = { + activeUsersByWorkflowId: new Map(), + }; + + addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) { + const { activeUsersByWorkflowId } = this.state; + + let activeUsers = activeUsersByWorkflowId.get(workflowId); + if (!activeUsers) { + activeUsers = new Map(); + activeUsersByWorkflowId.set(workflowId, activeUsers); + } + + activeUsers.set(userId, { + userId, + lastSeen: new Date(), + }); + } + + removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) { + const { activeUsersByWorkflowId } = this.state; + + const activeUsers = activeUsersByWorkflowId.get(workflowId); + if (!activeUsers) { + return; + } + + activeUsers.delete(userId); + if (activeUsers.size === 0) { + activeUsersByWorkflowId.delete(workflowId); + } + } + + getActiveWorkflowUsers(workflowId: Workflow['id']): ActiveWorkflowUser[] { + const workflowState = this.state.activeUsersByWorkflowId.get(workflowId); + if (!workflowState) { + return []; + } + + return [...workflowState.values()]; + } +} diff --git a/packages/cli/src/controllers/communityPackages.controller.ts b/packages/cli/src/controllers/communityPackages.controller.ts index c31ec8ff82..7438ae69d8 100644 --- a/packages/cli/src/controllers/communityPackages.controller.ts +++ b/packages/cli/src/controllers/communityPackages.controller.ts @@ -129,7 +129,7 @@ export class CommunityPackagesController { // broadcast to connected frontends that node list has been updated installedPackage.installedNodes.forEach((node) => { - this.push.send('reloadNodeType', { + this.push.broadcast('reloadNodeType', { name: node.type, version: node.latestVersion, }); @@ -218,7 +218,7 @@ export class CommunityPackagesController { // broadcast to connected frontends that node list has been updated installedPackage.installedNodes.forEach((node) => { - this.push.send('removeNodeType', { + this.push.broadcast('removeNodeType', { name: node.type, version: node.latestVersion, }); @@ -257,14 +257,14 @@ export class CommunityPackagesController { // broadcast to connected frontends that node list has been updated previouslyInstalledPackage.installedNodes.forEach((node) => { - this.push.send('removeNodeType', { + this.push.broadcast('removeNodeType', { name: node.type, version: node.latestVersion, }); }); newInstalledPackage.installedNodes.forEach((node) => { - this.push.send('reloadNodeType', { + this.push.broadcast('reloadNodeType', { name: node.name, version: node.latestVersion, }); @@ -283,7 +283,7 @@ export class CommunityPackagesController { return newInstalledPackage; } catch (error) { previouslyInstalledPackage.installedNodes.forEach((node) => { - this.push.send('removeNodeType', { + this.push.broadcast('removeNodeType', { name: node.type, version: node.latestVersion, }); diff --git a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts index 1e02cc9512..14d7611a8b 100644 --- a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts +++ b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts @@ -195,7 +195,7 @@ export async function recoverExecutionDataFromEventLogMessages( push.once('editorUiConnected', function handleUiBackUp() { // add a small timeout to make sure the UI is back up setTimeout(() => { - push.send('executionRecovered', { executionId }); + push.broadcast('executionRecovered', { executionId }); }, 1000); }); } diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 3660be0556..8204cdb688 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,17 +1,29 @@ -import { jsonStringify } from 'n8n-workflow'; +import { EventEmitter } from 'events'; +import { assert, jsonStringify } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; import { Logger } from '@/Logger'; +import type { User } from '@/databases/entities/User'; -export abstract class AbstractPush { +/** + * Abstract class for two-way push communication. + * Keeps track of user sessions and enables sending messages. + * + * @emits message when a message is received from a client + */ +export abstract class AbstractPush extends EventEmitter { protected connections: Record = {}; + protected userIdBySessionId: Record = {}; + protected abstract close(connection: T): void; protected abstract sendToOne(connection: T, data: string): void; - constructor(private readonly logger: Logger) {} + constructor(protected readonly logger: Logger) { + super(); + } - protected add(sessionId: string, connection: T): void { - const { connections } = this; + protected add(sessionId: string, userId: User['id'], connection: T): void { + const { connections, userIdBySessionId: userIdsBySessionId } = this; this.logger.debug('Add editor-UI session', { sessionId }); const existingConnection = connections[sessionId]; @@ -21,32 +33,65 @@ export abstract class AbstractPush { } connections[sessionId] = connection; + userIdsBySessionId[sessionId] = userId; + } + + protected onMessageReceived(sessionId: string, msg: unknown): void { + this.logger.debug('Received message from editor-UI', { sessionId, msg }); + const userId = this.userIdBySessionId[sessionId]; + this.emit('message', { + sessionId, + userId, + msg, + }); } protected remove(sessionId?: string): void { if (sessionId !== undefined) { this.logger.debug('Remove editor-UI session', { sessionId }); delete this.connections[sessionId]; + delete this.userIdBySessionId[sessionId]; } } - send(type: IPushDataType, data: D, sessionId: string | undefined) { + private sendToSessions(type: IPushDataType, data: D, sessionIds: string[]) { + this.logger.debug(`Send data of type "${type}" to editor-UI`, { + dataType: type, + sessionIds: sessionIds.join(', '), + }); + + const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true }); + + for (const sessionId of sessionIds) { + const connection = this.connections[sessionId]; + assert(connection); + this.sendToOne(connection, sendData); + } + } + + broadcast(type: IPushDataType, data?: D) { + this.sendToSessions(type, data, Object.keys(this.connections)); + } + + send(type: IPushDataType, data: D, sessionId: string) { const { connections } = this; - if (sessionId !== undefined && connections[sessionId] === undefined) { + if (connections[sessionId] === undefined) { this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId }); return; } - this.logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId }); + this.sendToSessions(type, data, [sessionId]); + } - const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true }); + /** + * Sends the given data to given users' connections + */ + sendToUsers(type: IPushDataType, data: D, userIds: Array) { + const { connections } = this; + const userSessionIds = Object.keys(connections).filter((sessionId) => + userIds.includes(this.userIdBySessionId[sessionId]), + ); - if (sessionId === undefined) { - // Send to all connected clients - Object.values(connections).forEach((connection) => this.sendToOne(connection, sendData)); - } else { - // Send only to a specific client - this.sendToOne(connections[sessionId], sendData); - } + this.sendToSessions(type, data, userSessionIds); } } diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 9fb26d795c..5d8c0b7db6 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -13,27 +13,52 @@ import { SSEPush } from './sse.push'; import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import type { IPushDataType } from '@/Interfaces'; +import type { User } from '@/databases/entities/User'; const useWebSockets = config.getEnv('push.backend') === 'websocket'; +/** + * Push service for uni- or bi-directional communication with frontend clients. + * Uses either server-sent events (SSE, unidirectional from backend --> frontend) + * or WebSocket (bidirectional backend <--> frontend) depending on the configuration. + * + * @emits message when a message is received from a client + */ @Service() export class Push extends EventEmitter { + public isBidirectional = useWebSockets; + private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { + const { + userId, + query: { sessionId }, + } = req; if (req.ws) { - (this.backend as WebSocketPush).add(req.query.sessionId, req.ws); + (this.backend as WebSocketPush).add(sessionId, userId, req.ws); + this.backend.on('message', (msg) => this.emit('message', msg)); } else if (!useWebSockets) { - (this.backend as SSEPush).add(req.query.sessionId, { req, res }); + (this.backend as SSEPush).add(sessionId, userId, { req, res }); } else { res.status(401).send('Unauthorized'); + return; } - this.emit('editorUiConnected', req.query.sessionId); + + this.emit('editorUiConnected', sessionId); } - send(type: IPushDataType, data: D, sessionId: string | undefined = undefined) { + broadcast(type: IPushDataType, data?: D) { + this.backend.broadcast(type, data); + } + + send(type: IPushDataType, data: D, sessionId: string) { this.backend.send(type, data, sessionId); } + + sendToUsers(type: IPushDataType, data: D, userIds: Array) { + this.backend.sendToUsers(type, data, userIds); + } } export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => { @@ -82,7 +107,8 @@ export const setupPushHandler = (restEndpoint: string, app: Application) => { try { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access const authCookie: string = req.cookies?.[AUTH_COOKIE_NAME] ?? ''; - await resolveJwt(authCookie); + const user = await resolveJwt(authCookie); + req.userId = user.id; } catch (error) { if (ws) { ws.send(`Unauthorized: ${(error as Error).message}`); diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index 80d02bb8a4..c19d916b86 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -3,6 +3,7 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; +import type { User } from '@/databases/entities/User'; type Connection = { req: PushRequest; res: PushResponse }; @@ -19,8 +20,8 @@ export class SSEPush extends AbstractPush { }); } - add(sessionId: string, connection: Connection) { - super.add(sessionId, connection); + add(sessionId: string, userId: User['id'], connection: Connection) { + super.add(sessionId, userId, connection); this.channel.addClient(connection.req, connection.res); } diff --git a/packages/cli/src/push/types.ts b/packages/cli/src/push/types.ts index 26baea050b..58c19c91c4 100644 --- a/packages/cli/src/push/types.ts +++ b/packages/cli/src/push/types.ts @@ -1,3 +1,4 @@ +import type { User } from '@/databases/entities/User'; import type { Request, Response } from 'express'; import type { WebSocket } from 'ws'; @@ -5,7 +6,13 @@ import type { WebSocket } from 'ws'; export type PushRequest = Request<{}, {}, {}, { sessionId: string }>; -export type SSEPushRequest = PushRequest & { ws: undefined }; -export type WebSocketPushRequest = PushRequest & { ws: WebSocket }; +export type SSEPushRequest = PushRequest & { ws: undefined; userId: User['id'] }; +export type WebSocketPushRequest = PushRequest & { ws: WebSocket; userId: User['id'] }; export type PushResponse = Response & { req: PushRequest }; + +export type OnPushMessageEvent = { + sessionId: string; + userId: User['id']; + msg: unknown; +}; diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 090546cf58..96b916b2c4 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -2,6 +2,7 @@ import type WebSocket from 'ws'; import { Service } from 'typedi'; import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; +import type { User } from '@/databases/entities/User'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -16,17 +17,34 @@ export class WebSocketPush extends AbstractPush { setInterval(() => this.pingAll(), 60 * 1000); } - add(sessionId: string, connection: WebSocket) { + add(sessionId: string, userId: User['id'], connection: WebSocket) { connection.isAlive = true; connection.on('pong', heartbeat); - super.add(sessionId, connection); + super.add(sessionId, userId, connection); + + const onMessage = (data: WebSocket.RawData) => { + try { + const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); + + this.onMessageReceived(sessionId, JSON.parse(buffer.toString('utf8'))); + } catch (error) { + this.logger.error("Couldn't parse message from editor-UI", { + error: error as unknown, + sessionId, + data, + }); + } + }; // Makes sure to remove the session if the connection is closed connection.once('close', () => { connection.off('pong', heartbeat); + connection.off('message', onMessage); this.remove(sessionId); }); + + connection.on('message', onMessage); } protected close(connection: WebSocket): void { diff --git a/packages/cli/test/unit/collaboration/collaboration.service.test.ts b/packages/cli/test/unit/collaboration/collaboration.service.test.ts new file mode 100644 index 0000000000..d4a4c5f335 --- /dev/null +++ b/packages/cli/test/unit/collaboration/collaboration.service.test.ts @@ -0,0 +1,150 @@ +import { CollaborationService } from '@/collaboration/collaboration.service'; +import type { Logger } from '@/Logger'; +import type { User } from '@/databases/entities/User'; +import type { UserService } from '@/services/user.service'; +import { CollaborationState } from '@/collaboration/collaboration.state'; +import type { Push } from '@/push'; +import type { + WorkflowClosedMessage, + WorkflowOpenedMessage, +} from '@/collaboration/collaboration.message'; + +describe('CollaborationService', () => { + let collaborationService: CollaborationService; + let mockLogger: Logger; + let mockUserService: jest.Mocked; + let state: CollaborationState; + let push: Push; + + beforeEach(() => { + mockLogger = { + warn: jest.fn(), + error: jest.fn(), + } as unknown as jest.Mocked; + mockUserService = { + getByIds: jest.fn(), + getManager: jest.fn(), + } as unknown as jest.Mocked; + + push = { + on: jest.fn(), + sendToUsers: jest.fn(), + } as unknown as Push; + state = new CollaborationState(); + collaborationService = new CollaborationService(mockLogger, push, state, mockUserService); + }); + + describe('workflow opened message', () => { + const userId = 'test-user'; + const workflowId = 'test-workflow'; + + const message: WorkflowOpenedMessage = { + type: 'workflowOpened', + workflowId, + }; + + const expectActiveUsersChangedMessage = (userIds: string[]) => { + expect(push.sendToUsers).toHaveBeenCalledWith( + 'activeWorkflowUsersChanged', + { + workflowId, + activeUsers: [ + { + user: { id: userId }, + lastSeen: expect.any(Date), + }, + ], + }, + [userId], + ); + }; + + describe('user is not yet active', () => { + it('updates state correctly', async () => { + mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]); + await collaborationService.handleUserMessage(userId, message); + + expect(state.getActiveWorkflowUsers(workflowId)).toEqual([ + { + lastSeen: expect.any(Date), + userId, + }, + ]); + }); + + it('sends active workflow users changed message', async () => { + mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]); + await collaborationService.handleUserMessage(userId, message); + + expectActiveUsersChangedMessage([userId]); + }); + }); + + describe('user is already active', () => { + beforeEach(() => { + state.addActiveWorkflowUser(workflowId, userId); + }); + + it('updates state correctly', async () => { + mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]); + await collaborationService.handleUserMessage(userId, message); + + expect(state.getActiveWorkflowUsers(workflowId)).toEqual([ + { + lastSeen: expect.any(Date), + userId, + }, + ]); + }); + + it('sends active workflow users changed message', async () => { + mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]); + await collaborationService.handleUserMessage(userId, message); + + expectActiveUsersChangedMessage([userId]); + }); + }); + }); + + describe('workflow closed message', () => { + const userId = 'test-user'; + const workflowId = 'test-workflow'; + + const message: WorkflowClosedMessage = { + type: 'workflowClosed', + workflowId, + }; + + describe('user is active', () => { + beforeEach(() => { + state.addActiveWorkflowUser(workflowId, userId); + }); + + it('updates state correctly', async () => { + await collaborationService.handleUserMessage(userId, message); + + expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]); + }); + + it('does not send active workflow users changed message', async () => { + await collaborationService.handleUserMessage(userId, message); + + expect(push.sendToUsers).not.toHaveBeenCalled(); + }); + }); + + describe('user is not active', () => { + it('updates state correctly', async () => { + await collaborationService.handleUserMessage(userId, message); + + expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]); + }); + + it('does not send active workflow users changed message', async () => { + await collaborationService.handleUserMessage(userId, message); + + expect(push.sendToUsers).not.toHaveBeenCalled(); + }); + }); + }); +}); diff --git a/packages/cli/test/unit/push/websocket.push.test.ts b/packages/cli/test/unit/push/websocket.push.test.ts new file mode 100644 index 0000000000..d4f72a5c9b --- /dev/null +++ b/packages/cli/test/unit/push/websocket.push.test.ts @@ -0,0 +1,161 @@ +import Container from 'typedi'; +import { EventEmitter } from 'events'; +import type WebSocket from 'ws'; +import { WebSocketPush } from '@/push/websocket.push'; +import { Logger } from '@/Logger'; +import type { User } from '@/databases/entities/User'; +import type { PushDataExecutionRecovered } from '@/Interfaces'; +import { mockInstance } from '../../integration/shared/utils'; + +jest.useFakeTimers(); + +class MockWebSocket extends EventEmitter { + public isAlive = true; + + public ping = jest.fn(); + + public send = jest.fn(); + + public terminate = jest.fn(); + + public close = jest.fn(); +} + +const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked; + +describe('WebSocketPush', () => { + const sessionId1 = 'test-session1'; + const sessionId2 = 'test-session2'; + const userId: User['id'] = 'test-user'; + + mockInstance(Logger); + const webSocketPush = Container.get(WebSocketPush); + const mockWebSocket1 = createMockWebSocket(); + const mockWebSocket2 = createMockWebSocket(); + + beforeEach(() => { + jest.resetAllMocks(); + }); + + it('can add a connection', () => { + webSocketPush.add(sessionId1, userId, mockWebSocket1); + + expect(mockWebSocket1.listenerCount('close')).toBe(1); + expect(mockWebSocket1.listenerCount('pong')).toBe(1); + expect(mockWebSocket1.listenerCount('message')).toBe(1); + }); + + it('closes a connection', () => { + webSocketPush.add(sessionId1, userId, mockWebSocket1); + + mockWebSocket1.emit('close'); + + expect(mockWebSocket1.listenerCount('close')).toBe(0); + expect(mockWebSocket1.listenerCount('pong')).toBe(0); + expect(mockWebSocket1.listenerCount('message')).toBe(0); + }); + + it('sends data to one connection', () => { + webSocketPush.add(sessionId1, userId, mockWebSocket1); + webSocketPush.add(sessionId2, userId, mockWebSocket2); + const data: PushDataExecutionRecovered = { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }; + + webSocketPush.send('executionRecovered', data, sessionId1); + + expect(mockWebSocket1.send).toHaveBeenCalledWith( + JSON.stringify({ + type: 'executionRecovered', + data: { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }, + }), + ); + expect(mockWebSocket2.send).not.toHaveBeenCalled(); + }); + + it('sends data to all connections', () => { + webSocketPush.add(sessionId1, userId, mockWebSocket1); + webSocketPush.add(sessionId2, userId, mockWebSocket2); + const data: PushDataExecutionRecovered = { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }; + + webSocketPush.broadcast('executionRecovered', data); + + const expectedMsg = JSON.stringify({ + type: 'executionRecovered', + data: { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }, + }); + expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); + expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); + }); + + it('sends data to all users connections', () => { + webSocketPush.add(sessionId1, userId, mockWebSocket1); + webSocketPush.add(sessionId2, userId, mockWebSocket2); + const data: PushDataExecutionRecovered = { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }; + + webSocketPush.sendToUsers('executionRecovered', data, [userId]); + + const expectedMsg = JSON.stringify({ + type: 'executionRecovered', + data: { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }, + }); + expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); + expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); + }); + + it('pings all connections', () => { + webSocketPush.add(sessionId1, userId, mockWebSocket1); + webSocketPush.add(sessionId2, userId, mockWebSocket2); + + jest.runOnlyPendingTimers(); + + expect(mockWebSocket1.ping).toHaveBeenCalled(); + expect(mockWebSocket2.ping).toHaveBeenCalled(); + }); + + it('emits message event when connection receives data', () => { + const mockOnMessageReceived = jest.fn(); + webSocketPush.on('message', mockOnMessageReceived); + webSocketPush.add(sessionId1, userId, mockWebSocket1); + webSocketPush.add(sessionId2, userId, mockWebSocket2); + + const data = { test: 'data' }; + const buffer = Buffer.from(JSON.stringify(data)); + + mockWebSocket1.emit('message', buffer); + + expect(mockOnMessageReceived).toHaveBeenCalledWith({ + msg: data, + sessionId: sessionId1, + userId, + }); + }); +});