diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 472e4a2f3c..25f758185a 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -312,7 +312,6 @@ export type IPushData = | PushDataTestWebhook | PushDataNodeDescriptionUpdated | PushDataExecutionRecovered - | PushDataActiveWorkflowUsersChanged | PushDataWorkerStatusMessage | PushDataWorkflowActivated | PushDataWorkflowDeactivated @@ -333,11 +332,6 @@ type PushDataWorkflowDeactivated = { type: 'workflowDeactivated'; }; -type PushDataActiveWorkflowUsersChanged = { - data: IActiveWorkflowUsersChanged; - type: 'activeWorkflowUsersChanged'; -}; - export type PushDataExecutionRecovered = { data: IPushDataExecutionRecovered; type: 'executionRecovered'; @@ -393,20 +387,10 @@ export type PushDataNodeDescriptionUpdated = { type: 'nodeDescriptionUpdated'; }; -export interface IActiveWorkflowUser { - user: User; - lastSeen: Date; -} - export interface IActiveWorkflowAdded { workflowId: Workflow['id']; } -export interface IActiveWorkflowUsersChanged { - workflowId: Workflow['id']; - activeUsers: IActiveWorkflowUser[]; -} - interface IActiveWorkflowChanged { workflowId: Workflow['id']; } diff --git a/packages/cli/src/collaboration/collaboration.message.ts b/packages/cli/src/collaboration/collaboration.message.ts deleted file mode 100644 index 0980bdccbb..0000000000 --- a/packages/cli/src/collaboration/collaboration.message.ts +++ /dev/null @@ -1,23 +0,0 @@ -export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage; - -export type WorkflowOpenedMessage = { - type: 'workflowOpened'; - workflowId: string; -}; - -export type WorkflowClosedMessage = { - type: 'workflowClosed'; - workflowId: string; -}; - -const isWorkflowMessage = (msg: unknown): msg is CollaborationMessage => { - return typeof msg === 'object' && msg !== null && 'type' in msg; -}; - -export const isWorkflowOpenedMessage = (msg: unknown): msg is WorkflowOpenedMessage => { - return isWorkflowMessage(msg) && msg.type === 'workflowOpened'; -}; - -export const isWorkflowClosedMessage = (msg: unknown): msg is WorkflowClosedMessage => { - return isWorkflowMessage(msg) && msg.type === 'workflowClosed'; -}; diff --git a/packages/cli/src/collaboration/collaboration.service.ts b/packages/cli/src/collaboration/collaboration.service.ts deleted file mode 100644 index 6d52d26558..0000000000 --- a/packages/cli/src/collaboration/collaboration.service.ts +++ /dev/null @@ -1,109 +0,0 @@ -import type { Workflow } from 'n8n-workflow'; -import { Service } from 'typedi'; -import config from '@/config'; -import { Push } from '../push'; -import { Logger } from '@/Logger'; -import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message'; -import { isWorkflowClosedMessage, isWorkflowOpenedMessage } from './collaboration.message'; -import { UserService } from '../services/user.service'; -import type { IActiveWorkflowUsersChanged } from '../Interfaces'; -import type { OnPushMessageEvent } from '@/push/types'; -import { CollaborationState } from '@/collaboration/collaboration.state'; -import { TIME } from '@/constants'; -import { UserRepository } from '@/databases/repositories/user.repository'; - -/** - * After how many minutes of inactivity a user should be removed - * as being an active user of a workflow. - */ -const INACTIVITY_CLEAN_UP_TIME_IN_MS = 15 * TIME.MINUTE; - -/** - * Service for managing collaboration feature between users. E.g. keeping - * track of active users for a workflow. - */ -@Service() -export class CollaborationService { - constructor( - private readonly logger: Logger, - private readonly push: Push, - private readonly state: CollaborationState, - private readonly userService: UserService, - private readonly userRepository: UserRepository, - ) { - if (!push.isBidirectional) { - logger.warn( - 'Collaboration features are disabled because push is configured unidirectional. Use N8N_PUSH_BACKEND=websocket environment variable to enable them.', - ); - return; - } - - const isMultiMainSetup = config.get('multiMainSetup.enabled'); - if (isMultiMainSetup) { - // TODO: We should support collaboration in multi-main setup as well - // This requires using redis as the state store instead of in-memory - logger.warn('Collaboration features are disabled because multi-main setup is enabled.'); - return; - } - - this.push.on('message', async (event: OnPushMessageEvent) => { - try { - await this.handleUserMessage(event.userId, event.msg); - } catch (error) { - this.logger.error('Error handling user message', { - error: error as unknown, - msg: event.msg, - userId: event.userId, - }); - } - }); - } - - async handleUserMessage(userId: string, msg: unknown) { - if (isWorkflowOpenedMessage(msg)) { - await this.handleWorkflowOpened(userId, msg); - } else if (isWorkflowClosedMessage(msg)) { - await this.handleWorkflowClosed(userId, msg); - } - } - - private async handleWorkflowOpened(userId: string, msg: WorkflowOpenedMessage) { - const { workflowId } = msg; - - this.state.addActiveWorkflowUser(workflowId, userId); - this.state.cleanInactiveUsers(workflowId, INACTIVITY_CLEAN_UP_TIME_IN_MS); - - await this.sendWorkflowUsersChangedMessage(workflowId); - } - - private async handleWorkflowClosed(userId: string, msg: WorkflowClosedMessage) { - const { workflowId } = msg; - - this.state.removeActiveWorkflowUser(workflowId, userId); - - await this.sendWorkflowUsersChangedMessage(workflowId); - } - - private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) { - const activeWorkflowUsers = this.state.getActiveWorkflowUsers(workflowId); - const workflowUserIds = activeWorkflowUsers.map((user) => user.userId); - - if (workflowUserIds.length === 0) { - return; - } - const users = await this.userRepository.getByIds( - this.userService.getManager(), - workflowUserIds, - ); - - const msgData: IActiveWorkflowUsersChanged = { - workflowId, - activeUsers: users.map((user) => ({ - user, - lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!.lastSeen, - })), - }; - - this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds); - } -} diff --git a/packages/cli/src/collaboration/collaboration.state.ts b/packages/cli/src/collaboration/collaboration.state.ts deleted file mode 100644 index 530a529650..0000000000 --- a/packages/cli/src/collaboration/collaboration.state.ts +++ /dev/null @@ -1,79 +0,0 @@ -import type { User } from '@db/entities/User'; -import type { Workflow } from 'n8n-workflow'; -import { Service } from 'typedi'; - -type ActiveWorkflowUser = { - userId: User['id']; - lastSeen: Date; -}; - -type UserStateByUserId = Map; - -type State = { - activeUsersByWorkflowId: Map; -}; - -/** - * State management for the collaboration service - */ -@Service() -export class CollaborationState { - private state: State = { - activeUsersByWorkflowId: new Map(), - }; - - addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) { - const { activeUsersByWorkflowId } = this.state; - - let activeUsers = activeUsersByWorkflowId.get(workflowId); - if (!activeUsers) { - activeUsers = new Map(); - activeUsersByWorkflowId.set(workflowId, activeUsers); - } - - activeUsers.set(userId, { - userId, - lastSeen: new Date(), - }); - } - - removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) { - const { activeUsersByWorkflowId } = this.state; - - const activeUsers = activeUsersByWorkflowId.get(workflowId); - if (!activeUsers) { - return; - } - - activeUsers.delete(userId); - if (activeUsers.size === 0) { - activeUsersByWorkflowId.delete(workflowId); - } - } - - getActiveWorkflowUsers(workflowId: Workflow['id']): ActiveWorkflowUser[] { - const workflowState = this.state.activeUsersByWorkflowId.get(workflowId); - if (!workflowState) { - return []; - } - - return [...workflowState.values()]; - } - - /** - * Removes all users that have not been seen in a given time - */ - cleanInactiveUsers(workflowId: Workflow['id'], inactivityCleanUpTimeInMs: number) { - const activeUsers = this.state.activeUsersByWorkflowId.get(workflowId); - if (!activeUsers) { - return; - } - - const now = Date.now(); - for (const user of activeUsers.values()) { - if (now - user.lastSeen.getTime() > inactivityCleanUpTimeInMs) { - activeUsers.delete(user.userId); - } - } - } -} diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 18e28d7c7b..e9ada3c4bb 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,9 +1,6 @@ -import { EventEmitter } from 'events'; import { assert, jsonStringify } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; import type { Logger } from '@/Logger'; -import type { User } from '@db/entities/User'; -import type { OrchestrationService } from '@/services/orchestration.service'; /** * Abstract class for two-way push communication. @@ -11,23 +8,16 @@ import type { OrchestrationService } from '@/services/orchestration.service'; * * @emits message when a message is received from a client */ -export abstract class AbstractPush extends EventEmitter { +export abstract class AbstractPush { protected connections: Record = {}; - protected userIdByPushRef: Record = {}; - protected abstract close(connection: T): void; protected abstract sendToOneConnection(connection: T, data: string): void; - constructor( - protected readonly logger: Logger, - private readonly orchestrationService: OrchestrationService, - ) { - super(); - } + constructor(protected readonly logger: Logger) {} - protected add(pushRef: string, userId: User['id'], connection: T) { - const { connections, userIdByPushRef } = this; + protected add(pushRef: string, connection: T) { + const { connections } = this; this.logger.debug('Add editor-UI session', { pushRef }); const existingConnection = connections[pushRef]; @@ -38,15 +28,6 @@ export abstract class AbstractPush extends EventEmitter { } connections[pushRef] = connection; - userIdByPushRef[pushRef] = userId; - } - - protected onMessageReceived(pushRef: string, msg: unknown) { - this.logger.debug('Received message from editor-UI', { pushRef, msg }); - - const userId = this.userIdByPushRef[pushRef]; - - this.emit('message', { pushRef, userId, msg }); } protected remove(pushRef?: string) { @@ -55,7 +36,6 @@ export abstract class AbstractPush extends EventEmitter { this.logger.debug('Removed editor-UI session', { pushRef }); delete this.connections[pushRef]; - delete this.userIdByPushRef[pushRef]; } private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) { @@ -77,21 +57,7 @@ export abstract class AbstractPush extends EventEmitter { this.sendTo(type, data, Object.keys(this.connections)); } - sendToOneSession(type: IPushDataType, data: unknown, pushRef: string) { - /** - * Multi-main setup: In a manual webhook execution, the main process that - * handles a webhook might not be the same as the main process that created - * the webhook. If so, the handler process commands the creator process to - * relay the former's execution lifecycle events to the creator's frontend. - */ - if (this.orchestrationService.isMultiMainSetupEnabled && !this.hasPushRef(pushRef)) { - const payload = { type, args: data, pushRef }; - - void this.orchestrationService.publish('relay-execution-lifecycle-event', payload); - - return; - } - + sendToOne(type: IPushDataType, data: unknown, pushRef: string) { if (this.connections[pushRef] === undefined) { this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef }); return; @@ -100,15 +66,6 @@ export abstract class AbstractPush extends EventEmitter { this.sendTo(type, data, [pushRef]); } - sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { - const { connections } = this; - const userPushRefs = Object.keys(connections).filter((pushRef) => - userIds.includes(this.userIdByPushRef[pushRef]), - ); - - this.sendTo(type, data, userPushRefs); - } - closeAllConnections() { for (const pushRef in this.connections) { // Signal the connection that we want to close it. diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 94c8a32bce..647ab40acf 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -6,15 +6,17 @@ import type { Application } from 'express'; import { Server as WSServer } from 'ws'; import { parse as parseUrl } from 'url'; import { Container, Service } from 'typedi'; + import config from '@/config'; -import { SSEPush } from './sse.push'; -import { WebSocketPush } from './websocket.push'; -import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; -import type { IPushDataType } from '@/Interfaces'; -import type { User } from '@db/entities/User'; import { OnShutdown } from '@/decorators/OnShutdown'; import { AuthService } from '@/auth/auth.service'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import type { IPushDataType } from '@/Interfaces'; +import { OrchestrationService } from '@/services/orchestration.service'; + +import { SSEPush } from './sse.push'; +import { WebSocketPush } from './websocket.push'; +import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -27,14 +29,10 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; */ @Service() export class Push extends EventEmitter { - public isBidirectional = useWebSockets; - private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); - constructor() { + constructor(private readonly orchestrationService: OrchestrationService) { super(); - - if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); } handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { @@ -54,9 +52,9 @@ export class Push extends EventEmitter { } if (req.ws) { - (this.backend as WebSocketPush).add(pushRef, user.id, req.ws); + (this.backend as WebSocketPush).add(pushRef, req.ws); } else if (!useWebSockets) { - (this.backend as SSEPush).add(pushRef, user.id, { req, res }); + (this.backend as SSEPush).add(pushRef, { req, res }); } else { res.status(401).send('Unauthorized'); return; @@ -70,17 +68,25 @@ export class Push extends EventEmitter { } send(type: IPushDataType, data: unknown, pushRef: string) { - this.backend.sendToOneSession(type, data, pushRef); + /** + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, the handler process commands the creator process to + * relay the former's execution lifecycle events to the creator's frontend. + */ + if (this.orchestrationService.isMultiMainSetupEnabled && !this.backend.hasPushRef(pushRef)) { + const payload = { type, args: data, pushRef }; + void this.orchestrationService.publish('relay-execution-lifecycle-event', payload); + return; + } + + this.backend.sendToOne(type, data, pushRef); } getBackend() { return this.backend; } - sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { - this.backend.sendToUsers(type, data, userIds); - } - @OnShutdown() onShutdown() { this.backend.closeAllConnections(); diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index d57b07ea1f..8367c74507 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -1,10 +1,10 @@ import SSEChannel from 'sse-channel'; import { Service } from 'typedi'; + import { Logger } from '@/Logger'; + import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; -import type { User } from '@db/entities/User'; -import { OrchestrationService } from '@/services/orchestration.service'; type Connection = { req: PushRequest; res: PushResponse }; @@ -14,16 +14,16 @@ export class SSEPush extends AbstractPush { readonly connections: Record = {}; - constructor(logger: Logger, orchestrationService: OrchestrationService) { - super(logger, orchestrationService); + constructor(logger: Logger) { + super(logger); this.channel.on('disconnect', (_, { req }) => { this.remove(req?.query?.pushRef); }); } - add(pushRef: string, userId: User['id'], connection: Connection) { - super.add(pushRef, userId, connection); + add(pushRef: string, connection: Connection) { + super.add(pushRef, connection); this.channel.addClient(connection.req, connection.res); } diff --git a/packages/cli/src/push/types.ts b/packages/cli/src/push/types.ts index 4068192b6c..a12e582213 100644 --- a/packages/cli/src/push/types.ts +++ b/packages/cli/src/push/types.ts @@ -1,7 +1,6 @@ import type { Response } from 'express'; import type { WebSocket } from 'ws'; -import type { User } from '@db/entities/User'; import type { AuthenticatedRequest } from '@/requests'; // TODO: move all push related types here @@ -12,9 +11,3 @@ export type SSEPushRequest = PushRequest & { ws: undefined }; export type WebSocketPushRequest = PushRequest & { ws: WebSocket }; export type PushResponse = Response & { req: PushRequest }; - -export type OnPushMessageEvent = { - pushRef: string; - userId: User['id']; - msg: unknown; -}; diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 49c081b363..04815038ce 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -2,8 +2,6 @@ import type WebSocket from 'ws'; import { Service } from 'typedi'; import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; -import type { User } from '@db/entities/User'; -import { OrchestrationService } from '@/services/orchestration.service'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -11,41 +9,24 @@ function heartbeat(this: WebSocket) { @Service() export class WebSocketPush extends AbstractPush { - constructor(logger: Logger, orchestrationService: OrchestrationService) { - super(logger, orchestrationService); + constructor(logger: Logger) { + super(logger); // Ping all connected clients every 60 seconds setInterval(() => this.pingAll(), 60 * 1000); } - add(pushRef: string, userId: User['id'], connection: WebSocket) { + add(pushRef: string, connection: WebSocket) { connection.isAlive = true; connection.on('pong', heartbeat); - super.add(pushRef, userId, connection); - - const onMessage = (data: WebSocket.RawData) => { - try { - const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); - - this.onMessageReceived(pushRef, JSON.parse(buffer.toString('utf8'))); - } catch (error) { - this.logger.error("Couldn't parse message from editor-UI", { - error: error as unknown, - pushRef, - data, - }); - } - }; + super.add(pushRef, connection); // Makes sure to remove the session if the connection is closed connection.once('close', () => { connection.off('pong', heartbeat); - connection.off('message', onMessage); this.remove(pushRef); }); - - connection.on('message', onMessage); } protected close(connection: WebSocket): void { diff --git a/packages/cli/test/unit/collaboration/collaboration.service.test.ts b/packages/cli/test/unit/collaboration/collaboration.service.test.ts deleted file mode 100644 index 3edc080bc9..0000000000 --- a/packages/cli/test/unit/collaboration/collaboration.service.test.ts +++ /dev/null @@ -1,151 +0,0 @@ -import { CollaborationService } from '@/collaboration/collaboration.service'; -import type { Logger } from '@/Logger'; -import type { User } from '@db/entities/User'; -import type { UserService } from '@/services/user.service'; -import { CollaborationState } from '@/collaboration/collaboration.state'; -import type { Push } from '@/push'; -import type { - WorkflowClosedMessage, - WorkflowOpenedMessage, -} from '@/collaboration/collaboration.message'; -import type { UserRepository } from '@/databases/repositories/user.repository'; -import { mock } from 'jest-mock-extended'; - -describe('CollaborationService', () => { - let collaborationService: CollaborationService; - let mockLogger: Logger; - let mockUserService: jest.Mocked; - let mockUserRepository: jest.Mocked; - let state: CollaborationState; - let push: Push; - - beforeEach(() => { - mockLogger = mock(); - mockUserService = mock(); - mockUserRepository = mock(); - push = mock(); - state = new CollaborationState(); - - collaborationService = new CollaborationService( - mockLogger, - push, - state, - mockUserService, - mockUserRepository, - ); - }); - - describe('workflow opened message', () => { - const userId = 'test-user'; - const workflowId = 'test-workflow'; - - const message: WorkflowOpenedMessage = { - type: 'workflowOpened', - workflowId, - }; - - const expectActiveUsersChangedMessage = () => { - expect(push.sendToUsers).toHaveBeenCalledWith( - 'activeWorkflowUsersChanged', - { - workflowId, - activeUsers: [ - { - user: { id: userId }, - lastSeen: expect.any(Date), - }, - ], - }, - [userId], - ); - }; - - describe('user is not yet active', () => { - it('updates state correctly', async () => { - mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]); - await collaborationService.handleUserMessage(userId, message); - - expect(state.getActiveWorkflowUsers(workflowId)).toEqual([ - { - lastSeen: expect.any(Date), - userId, - }, - ]); - }); - - it('sends active workflow users changed message', async () => { - mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]); - await collaborationService.handleUserMessage(userId, message); - - expectActiveUsersChangedMessage(); - }); - }); - - describe('user is already active', () => { - beforeEach(() => { - state.addActiveWorkflowUser(workflowId, userId); - }); - - it('updates state correctly', async () => { - mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]); - await collaborationService.handleUserMessage(userId, message); - - expect(state.getActiveWorkflowUsers(workflowId)).toEqual([ - { - lastSeen: expect.any(Date), - userId, - }, - ]); - }); - - it('sends active workflow users changed message', async () => { - mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]); - await collaborationService.handleUserMessage(userId, message); - - expectActiveUsersChangedMessage(); - }); - }); - }); - - describe('workflow closed message', () => { - const userId = 'test-user'; - const workflowId = 'test-workflow'; - - const message: WorkflowClosedMessage = { - type: 'workflowClosed', - workflowId, - }; - - describe('user is active', () => { - beforeEach(() => { - state.addActiveWorkflowUser(workflowId, userId); - }); - - it('updates state correctly', async () => { - await collaborationService.handleUserMessage(userId, message); - - expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]); - }); - - it('does not send active workflow users changed message', async () => { - await collaborationService.handleUserMessage(userId, message); - - expect(push.sendToUsers).not.toHaveBeenCalled(); - }); - }); - - describe('user is not active', () => { - it('updates state correctly', async () => { - await collaborationService.handleUserMessage(userId, message); - - expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]); - }); - - it('does not send active workflow users changed message', async () => { - await collaborationService.handleUserMessage(userId, message); - - expect(push.sendToUsers).not.toHaveBeenCalled(); - }); - }); - }); -}); diff --git a/packages/cli/test/unit/collaboration/collaboration.state.test.ts b/packages/cli/test/unit/collaboration/collaboration.state.test.ts deleted file mode 100644 index b390bf9ad9..0000000000 --- a/packages/cli/test/unit/collaboration/collaboration.state.test.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { TIME } from '@/constants'; -import { CollaborationState } from '@/collaboration/collaboration.state'; - -const origDate = global.Date; - -const mockDateFactory = (currentDate: string) => { - return class CustomDate extends origDate { - constructor() { - super(currentDate); - } - } as DateConstructor; -}; - -describe('CollaborationState', () => { - let collaborationState: CollaborationState; - - beforeEach(() => { - collaborationState = new CollaborationState(); - }); - - describe('cleanInactiveUsers', () => { - const workflowId = 'workflow'; - - it('should remove inactive users', () => { - // Setup - global.Date = mockDateFactory('2023-01-01T00:00:00.000Z'); - collaborationState.addActiveWorkflowUser(workflowId, 'inactiveUser'); - - global.Date = mockDateFactory('2023-01-01T00:30:00.000Z'); - collaborationState.addActiveWorkflowUser(workflowId, 'activeUser'); - - // Act: Clean inactive users - jest - .spyOn(global.Date, 'now') - .mockReturnValue(new origDate('2023-01-01T00:35:00.000Z').getTime()); - collaborationState.cleanInactiveUsers(workflowId, 10 * TIME.MINUTE); - - // Assert: The inactive user should be removed - expect(collaborationState.getActiveWorkflowUsers(workflowId)).toEqual([ - { userId: 'activeUser', lastSeen: new origDate('2023-01-01T00:30:00.000Z') }, - ]); - }); - - it('should not remove active users', () => { - // Setup: Add an active user to the state - global.Date = mockDateFactory('2023-01-01T00:30:00.000Z'); - collaborationState.addActiveWorkflowUser(workflowId, 'activeUser'); - - // Act: Clean inactive users - jest - .spyOn(global.Date, 'now') - .mockReturnValue(new origDate('2023-01-01T00:35:00.000Z').getTime()); - collaborationState.cleanInactiveUsers(workflowId, 10 * TIME.MINUTE); - - // Assert: The active user should still be present - expect(collaborationState.getActiveWorkflowUsers(workflowId)).toEqual([ - { userId: 'activeUser', lastSeen: new origDate('2023-01-01T00:30:00.000Z') }, - ]); - }); - }); -}); diff --git a/packages/cli/test/unit/push/index.test.ts b/packages/cli/test/unit/push/index.test.ts index 390ee100e7..1736693509 100644 --- a/packages/cli/test/unit/push/index.test.ts +++ b/packages/cli/test/unit/push/index.test.ts @@ -1,14 +1,16 @@ import type { WebSocket } from 'ws'; +import { mock } from 'jest-mock-extended'; + import config from '@/config'; import type { User } from '@db/entities/User'; import { Push } from '@/push'; import { SSEPush } from '@/push/sse.push'; import { WebSocketPush } from '@/push/websocket.push'; import type { WebSocketPushRequest, SSEPushRequest } from '@/push/types'; -import { mockInstance } from '../../shared/mocking'; -import { mock } from 'jest-mock-extended'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { mockInstance } from '../../shared/mocking'; + jest.unmock('@/push'); describe('Push', () => { @@ -19,7 +21,7 @@ describe('Push', () => { test('should validate pushRef on requests for websocket backend', () => { config.set('push.backend', 'websocket'); - const push = new Push(); + const push = new Push(mock()); const ws = mock(); const request = mock({ user, ws }); request.query = { pushRef: '' }; @@ -32,7 +34,7 @@ describe('Push', () => { test('should validate pushRef on requests for SSE backend', () => { config.set('push.backend', 'sse'); - const push = new Push(); + const push = new Push(mock()); const request = mock({ user, ws: undefined }); request.query = { pushRef: '' }; expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError); diff --git a/packages/cli/test/unit/push/websocket.push.test.ts b/packages/cli/test/unit/push/websocket.push.test.ts index 47f67a6847..7531e43776 100644 --- a/packages/cli/test/unit/push/websocket.push.test.ts +++ b/packages/cli/test/unit/push/websocket.push.test.ts @@ -1,10 +1,11 @@ -import Container from 'typedi'; +import { Container } from 'typedi'; import { EventEmitter } from 'events'; import type WebSocket from 'ws'; + import { WebSocketPush } from '@/push/websocket.push'; import { Logger } from '@/Logger'; -import type { User } from '@db/entities/User'; import type { PushDataExecutionRecovered } from '@/Interfaces'; + import { mockInstance } from '../../shared/mocking'; jest.useFakeTimers(); @@ -26,7 +27,6 @@ const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked< describe('WebSocketPush', () => { const pushRef1 = 'test-session1'; const pushRef2 = 'test-session2'; - const userId: User['id'] = 'test-user'; mockInstance(Logger); const webSocketPush = Container.get(WebSocketPush); @@ -38,26 +38,24 @@ describe('WebSocketPush', () => { }); it('can add a connection', () => { - webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef1, mockWebSocket1); expect(mockWebSocket1.listenerCount('close')).toBe(1); expect(mockWebSocket1.listenerCount('pong')).toBe(1); - expect(mockWebSocket1.listenerCount('message')).toBe(1); }); it('closes a connection', () => { - webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef1, mockWebSocket1); mockWebSocket1.emit('close'); expect(mockWebSocket1.listenerCount('close')).toBe(0); expect(mockWebSocket1.listenerCount('pong')).toBe(0); - expect(mockWebSocket1.listenerCount('message')).toBe(0); }); it('sends data to one connection', () => { - webSocketPush.add(pushRef1, userId, mockWebSocket1); - webSocketPush.add(pushRef2, userId, mockWebSocket2); + webSocketPush.add(pushRef1, mockWebSocket1); + webSocketPush.add(pushRef2, mockWebSocket2); const data: PushDataExecutionRecovered = { type: 'executionRecovered', data: { @@ -65,7 +63,7 @@ describe('WebSocketPush', () => { }, }; - webSocketPush.sendToOneSession('executionRecovered', data, pushRef1); + webSocketPush.sendToOne('executionRecovered', data, pushRef1); expect(mockWebSocket1.send).toHaveBeenCalledWith( JSON.stringify({ @@ -82,8 +80,8 @@ describe('WebSocketPush', () => { }); it('sends data to all connections', () => { - webSocketPush.add(pushRef1, userId, mockWebSocket1); - webSocketPush.add(pushRef2, userId, mockWebSocket2); + webSocketPush.add(pushRef1, mockWebSocket1); + webSocketPush.add(pushRef2, mockWebSocket2); const data: PushDataExecutionRecovered = { type: 'executionRecovered', data: { @@ -106,56 +104,13 @@ describe('WebSocketPush', () => { expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); }); - it('sends data to all users connections', () => { - webSocketPush.add(pushRef1, userId, mockWebSocket1); - webSocketPush.add(pushRef2, userId, mockWebSocket2); - const data: PushDataExecutionRecovered = { - type: 'executionRecovered', - data: { - executionId: 'test-execution-id', - }, - }; - - webSocketPush.sendToUsers('executionRecovered', data, [userId]); - - const expectedMsg = JSON.stringify({ - type: 'executionRecovered', - data: { - type: 'executionRecovered', - data: { - executionId: 'test-execution-id', - }, - }, - }); - expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); - expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); - }); - it('pings all connections', () => { - webSocketPush.add(pushRef1, userId, mockWebSocket1); - webSocketPush.add(pushRef2, userId, mockWebSocket2); + webSocketPush.add(pushRef1, mockWebSocket1); + webSocketPush.add(pushRef2, mockWebSocket2); jest.runOnlyPendingTimers(); expect(mockWebSocket1.ping).toHaveBeenCalled(); expect(mockWebSocket2.ping).toHaveBeenCalled(); }); - - it('emits message event when connection receives data', () => { - const mockOnMessageReceived = jest.fn(); - webSocketPush.on('message', mockOnMessageReceived); - webSocketPush.add(pushRef1, userId, mockWebSocket1); - webSocketPush.add(pushRef2, userId, mockWebSocket2); - - const data = { test: 'data' }; - const buffer = Buffer.from(JSON.stringify(data)); - - mockWebSocket1.emit('message', buffer); - - expect(mockOnMessageReceived).toHaveBeenCalledWith({ - msg: data, - pushRef: pushRef1, - userId, - }); - }); }); diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index c58ecd7c4b..f888e5b4d6 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -432,16 +432,6 @@ export interface IExecutionDeleteFilter { ids?: string[]; } -export type PushDataUsersForWorkflow = { - workflowId: string; - activeUsers: Array<{ user: IUser; lastSeen: string }>; -}; - -type PushDataWorkflowUsersChanged = { - data: PushDataUsersForWorkflow; - type: 'activeWorkflowUsersChanged'; -}; - export type IPushData = | PushDataExecutionFinished | PushDataExecutionStarted @@ -456,8 +446,7 @@ export type IPushData = | PushDataWorkerStatusMessage | PushDataActiveWorkflowAdded | PushDataActiveWorkflowRemoved - | PushDataWorkflowFailedToActivate - | PushDataWorkflowUsersChanged; + | PushDataWorkflowFailedToActivate; export type PushDataActiveWorkflowAdded = { data: IActiveWorkflowAdded; diff --git a/packages/editor-ui/src/components/MainHeader/CollaborationPane.vue b/packages/editor-ui/src/components/MainHeader/CollaborationPane.vue deleted file mode 100644 index daabc0a178..0000000000 --- a/packages/editor-ui/src/components/MainHeader/CollaborationPane.vue +++ /dev/null @@ -1,82 +0,0 @@ - - - - - diff --git a/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue b/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue index babf3b36df..1b172b700d 100644 --- a/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue +++ b/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue @@ -22,7 +22,6 @@ import SaveButton from '@/components/SaveButton.vue'; import TagsDropdown from '@/components/TagsDropdown.vue'; import InlineTextEdit from '@/components/InlineTextEdit.vue'; import BreakpointsObserver from '@/components/BreakpointsObserver.vue'; -import CollaborationPane from '@/components/MainHeader/CollaborationPane.vue'; import { useRootStore } from '@/stores/root.store'; import { useSettingsStore } from '@/stores/settings.store'; @@ -622,7 +621,6 @@ function showCreateWorkflowSuccessToast(id?: string) {
- { - afterEach(() => { - vi.clearAllMocks(); - }); - - it('should show only current workflow users', async () => { - const { getByTestId, queryByTestId } = renderComponent(); - await waitAllPromises(); - - expect(getByTestId('collaboration-pane')).toBeInTheDocument(); - expect(getByTestId('user-stack-avatars')).toBeInTheDocument(); - expect(getByTestId(`user-stack-avatar-${OWNER_USER.id}`)).toBeInTheDocument(); - expect(getByTestId(`user-stack-avatar-${MEMBER_USER.id}`)).toBeInTheDocument(); - expect(queryByTestId(`user-stack-avatar-${MEMBER_USER_2.id}`)).toBeNull(); - }); - - it('should render current user correctly', async () => { - const { getByText, queryByText } = renderComponent(); - await waitAllPromises(); - expect(getByText(`${OWNER_USER.fullName} (you)`)).toBeInTheDocument(); - expect(queryByText(`${MEMBER_USER.fullName} (you)`)).toBeNull(); - expect(queryByText(`${MEMBER_USER.fullName}`)).toBeInTheDocument(); - }); - - it('should always render owner first in the list', async () => { - const { getByTestId } = renderComponent(); - await waitAllPromises(); - const firstAvatar = getByTestId('user-stack-avatars').querySelector('.n8n-avatar'); - // Owner is second in the store bur shourld be rendered first - expect(firstAvatar).toHaveAttribute('data-test-id', `user-stack-avatar-${OWNER_USER.id}`); - }); -}); diff --git a/packages/editor-ui/src/composables/__tests__/usePushConnection.spec.ts b/packages/editor-ui/src/composables/__tests__/usePushConnection.spec.ts index ce88b2b00f..68a1f607ea 100644 --- a/packages/editor-ui/src/composables/__tests__/usePushConnection.spec.ts +++ b/packages/editor-ui/src/composables/__tests__/usePushConnection.spec.ts @@ -2,7 +2,6 @@ import { usePushConnection } from '@/composables/usePushConnection'; import { useRouter } from 'vue-router'; import { createPinia, setActivePinia } from 'pinia'; import { usePushConnectionStore } from '@/stores/pushConnection.store'; -import { useCollaborationStore } from '@/stores/collaboration.store'; import type { IPushData, PushDataWorkerStatusMessage } from '@/Interface'; import { useOrchestrationStore } from '@/stores/orchestration.store'; @@ -21,7 +20,6 @@ vi.useFakeTimers(); describe('usePushConnection()', () => { let router: ReturnType; let pushStore: ReturnType; - let collaborationStore: ReturnType; let orchestrationStore: ReturnType; let pushConnection: ReturnType; @@ -30,7 +28,6 @@ describe('usePushConnection()', () => { router = vi.mocked(useRouter)(); pushStore = usePushConnectionStore(); - collaborationStore = useCollaborationStore(); orchestrationStore = useOrchestrationStore(); pushConnection = usePushConnection({ router }); }); @@ -43,12 +40,6 @@ describe('usePushConnection()', () => { expect(spy).toHaveBeenCalled(); }); - - it('should initialize collaborationStore', () => { - const spy = vi.spyOn(collaborationStore, 'initialize').mockImplementation(() => {}); - pushConnection.initialize(); - expect(spy).toHaveBeenCalled(); - }); }); describe('terminate()', () => { @@ -61,12 +52,6 @@ describe('usePushConnection()', () => { expect(returnFn).toHaveBeenCalled(); }); - - it('should terminate collaborationStore', () => { - const spy = vi.spyOn(collaborationStore, 'terminate').mockImplementation(() => {}); - pushConnection.terminate(); - expect(spy).toHaveBeenCalled(); - }); }); describe('queuePushMessage()', () => { diff --git a/packages/editor-ui/src/composables/usePushConnection.ts b/packages/editor-ui/src/composables/usePushConnection.ts index a903647171..929f2d61bc 100644 --- a/packages/editor-ui/src/composables/usePushConnection.ts +++ b/packages/editor-ui/src/composables/usePushConnection.ts @@ -35,7 +35,6 @@ import { parse } from 'flatted'; import { ref } from 'vue'; import { useOrchestrationStore } from '@/stores/orchestration.store'; import { usePushConnectionStore } from '@/stores/pushConnection.store'; -import { useCollaborationStore } from '@/stores/collaboration.store'; import { useExternalHooks } from '@/composables/useExternalHooks'; import type { useRouter } from 'vue-router'; import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers'; @@ -51,7 +50,6 @@ export function usePushConnection({ router }: { router: ReturnType { void pushMessageReceived(message); }); - collaborationStore.initialize(); } function terminate() { - collaborationStore.terminate(); if (typeof removeEventListener.value === 'function') { removeEventListener.value(); } diff --git a/packages/editor-ui/src/constants.ts b/packages/editor-ui/src/constants.ts index 3d007e51a8..64287b7281 100644 --- a/packages/editor-ui/src/constants.ts +++ b/packages/editor-ui/src/constants.ts @@ -637,7 +637,6 @@ export const enum STORES { HISTORY = 'history', CLOUD_PLAN = 'cloudPlan', RBAC = 'rbac', - COLLABORATION = 'collaboration', PUSH = 'push', BECOME_TEMPLATE_CREATOR = 'becomeTemplateCreator', } diff --git a/packages/editor-ui/src/stores/collaboration.store.ts b/packages/editor-ui/src/stores/collaboration.store.ts deleted file mode 100644 index 208d5fa588..0000000000 --- a/packages/editor-ui/src/stores/collaboration.store.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { defineStore } from 'pinia'; -import { computed, ref } from 'vue'; -import { useWorkflowsStore } from '@/stores/workflows.store'; -import { usePushConnectionStore } from '@/stores/pushConnection.store'; -import { STORES } from '@/constants'; -import type { IUser } from '@/Interface'; - -type ActiveUsersForWorkflows = { - [workflowId: string]: Array<{ user: IUser; lastSeen: string }>; -}; - -export const useCollaborationStore = defineStore(STORES.COLLABORATION, () => { - const pushStore = usePushConnectionStore(); - const workflowStore = useWorkflowsStore(); - - const usersForWorkflows = ref({}); - const pushStoreEventListenerRemovalFn = ref<(() => void) | null>(null); - - const getUsersForCurrentWorkflow = computed(() => { - return usersForWorkflows.value[workflowStore.workflowId] ?? []; - }); - - function initialize() { - pushStoreEventListenerRemovalFn.value = pushStore.addEventListener((event) => { - if (event.type === 'activeWorkflowUsersChanged') { - const activeWorkflowId = workflowStore.workflowId; - if (event.data.workflowId === activeWorkflowId) { - usersForWorkflows.value[activeWorkflowId] = event.data.activeUsers; - } - } - }); - } - - function terminate() { - if (typeof pushStoreEventListenerRemovalFn.value === 'function') { - pushStoreEventListenerRemovalFn.value(); - } - } - - function workflowUsersUpdated(data: ActiveUsersForWorkflows) { - usersForWorkflows.value = data; - } - - function notifyWorkflowOpened(workflowId: string) { - pushStore.send({ - type: 'workflowOpened', - workflowId, - }); - } - - function notifyWorkflowClosed(workflowId: string) { - pushStore.send({ type: 'workflowClosed', workflowId }); - } - - return { - usersForWorkflows, - initialize, - terminate, - notifyWorkflowOpened, - notifyWorkflowClosed, - workflowUsersUpdated, - getUsersForCurrentWorkflow, - }; -}); diff --git a/packages/editor-ui/src/views/NodeView.v2.vue b/packages/editor-ui/src/views/NodeView.v2.vue index a620f6a97d..2099540573 100644 --- a/packages/editor-ui/src/views/NodeView.v2.vue +++ b/packages/editor-ui/src/views/NodeView.v2.vue @@ -36,7 +36,6 @@ import { useCredentialsStore } from '@/stores/credentials.store'; import useEnvironmentsStore from '@/stores/environments.ee.store'; import { useExternalSecretsStore } from '@/stores/externalSecrets.ee.store'; import { useRootStore } from '@/stores/root.store'; -import { useCollaborationStore } from '@/stores/collaboration.store'; import { historyBus } from '@/models/history'; import { useCanvasOperations } from '@/composables/useCanvasOperations'; import { useExecutionsStore } from '@/stores/executions.store'; @@ -79,7 +78,6 @@ const credentialsStore = useCredentialsStore(); const environmentsStore = useEnvironmentsStore(); const externalSecretsStore = useExternalSecretsStore(); const rootStore = useRootStore(); -const collaborationStore = useCollaborationStore(); const executionsStore = useExecutionsStore(); const canvasStore = useCanvasStore(); const npsSurveyStore = useNpsSurveyStore(); @@ -173,7 +171,6 @@ async function initializeData() { workflowId: workflowsStore.workflow.id, workflowName: workflowsStore.workflow.name, }); - collaborationStore.notifyWorkflowOpened(workflowsStore.workflow.id); const selectedExecution = executionsStore.activeExecution; if (selectedExecution?.workflowId !== workflowsStore.workflow.id) { diff --git a/packages/editor-ui/src/views/NodeView.vue b/packages/editor-ui/src/views/NodeView.vue index 9e08d8dfbf..43aab6d58c 100644 --- a/packages/editor-ui/src/views/NodeView.vue +++ b/packages/editor-ui/src/views/NodeView.vue @@ -246,7 +246,6 @@ import { AI_NODE_CREATOR_VIEW, DRAG_EVENT_DATA_KEY, UPDATE_WEBHOOK_ID_NODE_TYPES, - TIME, AI_ASSISTANT_LOCAL_STORAGE_KEY, CANVAS_AUTO_ADD_MANUAL_TRIGGER_EXPERIMENT, } from '@/constants'; @@ -322,7 +321,6 @@ import type { import { type RouteLocation, useRouter } from 'vue-router'; import { dataPinningEventBus, nodeViewEventBus } from '@/event-bus'; import { useCanvasStore } from '@/stores/canvas.store'; -import { useCollaborationStore } from '@/stores/collaboration.store'; import { useCredentialsStore } from '@/stores/credentials.store'; import { useEnvironmentsStore } from '@/stores/environments.ee.store'; import { useExternalSecretsStore } from '@/stores/externalSecrets.ee.store'; @@ -471,18 +469,15 @@ export default defineComponent({ await this.$router.push(to); } else { - this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow); next(); } } else if (confirmModal === MODAL_CANCEL) { - this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow); this.workflowsStore.setWorkflowId(PLACEHOLDER_EMPTY_WORKFLOW_ID); this.resetWorkspace(); this.uiStore.stateIsDirty = false; next(); } } else { - this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow); next(); } }, @@ -588,7 +583,6 @@ export default defineComponent({ useWorkflowsEEStore, useHistoryStore, useExternalSecretsStore, - useCollaborationStore, usePushConnectionStore, useSourceControlStore, useExecutionsStore, @@ -1035,7 +1029,6 @@ export default defineComponent({ if (!this.isDemo) { this.pushStore.pushConnect(); } - this.collaborationStore.initialize(); }, beforeUnmount() { // Make sure the event listeners get removed again else we @@ -1049,7 +1042,6 @@ export default defineComponent({ if (!this.isDemo) { this.pushStore.pushDisconnect(); } - this.collaborationStore.terminate(); this.resetWorkspace(); this.instance.unbind(); @@ -1506,7 +1498,6 @@ export default defineComponent({ this.executionsStore.activeExecution = selectedExecution; } this.canvasStore.stopLoading(); - this.collaborationStore.notifyWorkflowOpened(workflow.id); }, touchTap(e: MouseEvent | TouchEvent) { if (this.deviceSupport.isTouchDevice) { @@ -3681,18 +3672,10 @@ export default defineComponent({ if (this.isDemo || window.preventNodeViewBeforeUnload) { return; } else if (this.uiStore.stateIsDirty) { - // A bit hacky solution to detecting users leaving the page after prompt: - // 1. Notify that workflow is closed straight away - this.collaborationStore.notifyWorkflowClosed(this.workflowsStore.workflowId); - // 2. If user decided to stay on the page we notify that the workflow is opened again - this.unloadTimeout = setTimeout(() => { - this.collaborationStore.notifyWorkflowOpened(this.workflowsStore.workflowId); - }, 5 * TIME.SECOND); e.returnValue = true; //Gecko + IE return true; //Gecko + Webkit, Safari, Chrome etc. } else { this.canvasStore.startLoading(this.$locale.baseText('nodeView.redirecting')); - this.collaborationStore.notifyWorkflowClosed(this.workflowsStore.workflowId); return; } },