mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 18:12:04 +00:00
refactor(core): Delete all collaboration related code (no-changelog) (#9929)
This commit is contained in:
committed by
GitHub
parent
24091dfd9b
commit
22990342df
@@ -312,7 +312,6 @@ export type IPushData =
|
|||||||
| PushDataTestWebhook
|
| PushDataTestWebhook
|
||||||
| PushDataNodeDescriptionUpdated
|
| PushDataNodeDescriptionUpdated
|
||||||
| PushDataExecutionRecovered
|
| PushDataExecutionRecovered
|
||||||
| PushDataActiveWorkflowUsersChanged
|
|
||||||
| PushDataWorkerStatusMessage
|
| PushDataWorkerStatusMessage
|
||||||
| PushDataWorkflowActivated
|
| PushDataWorkflowActivated
|
||||||
| PushDataWorkflowDeactivated
|
| PushDataWorkflowDeactivated
|
||||||
@@ -333,11 +332,6 @@ type PushDataWorkflowDeactivated = {
|
|||||||
type: 'workflowDeactivated';
|
type: 'workflowDeactivated';
|
||||||
};
|
};
|
||||||
|
|
||||||
type PushDataActiveWorkflowUsersChanged = {
|
|
||||||
data: IActiveWorkflowUsersChanged;
|
|
||||||
type: 'activeWorkflowUsersChanged';
|
|
||||||
};
|
|
||||||
|
|
||||||
export type PushDataExecutionRecovered = {
|
export type PushDataExecutionRecovered = {
|
||||||
data: IPushDataExecutionRecovered;
|
data: IPushDataExecutionRecovered;
|
||||||
type: 'executionRecovered';
|
type: 'executionRecovered';
|
||||||
@@ -393,20 +387,10 @@ export type PushDataNodeDescriptionUpdated = {
|
|||||||
type: 'nodeDescriptionUpdated';
|
type: 'nodeDescriptionUpdated';
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface IActiveWorkflowUser {
|
|
||||||
user: User;
|
|
||||||
lastSeen: Date;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface IActiveWorkflowAdded {
|
export interface IActiveWorkflowAdded {
|
||||||
workflowId: Workflow['id'];
|
workflowId: Workflow['id'];
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IActiveWorkflowUsersChanged {
|
|
||||||
workflowId: Workflow['id'];
|
|
||||||
activeUsers: IActiveWorkflowUser[];
|
|
||||||
}
|
|
||||||
|
|
||||||
interface IActiveWorkflowChanged {
|
interface IActiveWorkflowChanged {
|
||||||
workflowId: Workflow['id'];
|
workflowId: Workflow['id'];
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,23 +0,0 @@
|
|||||||
export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage;
|
|
||||||
|
|
||||||
export type WorkflowOpenedMessage = {
|
|
||||||
type: 'workflowOpened';
|
|
||||||
workflowId: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
export type WorkflowClosedMessage = {
|
|
||||||
type: 'workflowClosed';
|
|
||||||
workflowId: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
const isWorkflowMessage = (msg: unknown): msg is CollaborationMessage => {
|
|
||||||
return typeof msg === 'object' && msg !== null && 'type' in msg;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const isWorkflowOpenedMessage = (msg: unknown): msg is WorkflowOpenedMessage => {
|
|
||||||
return isWorkflowMessage(msg) && msg.type === 'workflowOpened';
|
|
||||||
};
|
|
||||||
|
|
||||||
export const isWorkflowClosedMessage = (msg: unknown): msg is WorkflowClosedMessage => {
|
|
||||||
return isWorkflowMessage(msg) && msg.type === 'workflowClosed';
|
|
||||||
};
|
|
||||||
@@ -1,109 +0,0 @@
|
|||||||
import type { Workflow } from 'n8n-workflow';
|
|
||||||
import { Service } from 'typedi';
|
|
||||||
import config from '@/config';
|
|
||||||
import { Push } from '../push';
|
|
||||||
import { Logger } from '@/Logger';
|
|
||||||
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
|
|
||||||
import { isWorkflowClosedMessage, isWorkflowOpenedMessage } from './collaboration.message';
|
|
||||||
import { UserService } from '../services/user.service';
|
|
||||||
import type { IActiveWorkflowUsersChanged } from '../Interfaces';
|
|
||||||
import type { OnPushMessageEvent } from '@/push/types';
|
|
||||||
import { CollaborationState } from '@/collaboration/collaboration.state';
|
|
||||||
import { TIME } from '@/constants';
|
|
||||||
import { UserRepository } from '@/databases/repositories/user.repository';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* After how many minutes of inactivity a user should be removed
|
|
||||||
* as being an active user of a workflow.
|
|
||||||
*/
|
|
||||||
const INACTIVITY_CLEAN_UP_TIME_IN_MS = 15 * TIME.MINUTE;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Service for managing collaboration feature between users. E.g. keeping
|
|
||||||
* track of active users for a workflow.
|
|
||||||
*/
|
|
||||||
@Service()
|
|
||||||
export class CollaborationService {
|
|
||||||
constructor(
|
|
||||||
private readonly logger: Logger,
|
|
||||||
private readonly push: Push,
|
|
||||||
private readonly state: CollaborationState,
|
|
||||||
private readonly userService: UserService,
|
|
||||||
private readonly userRepository: UserRepository,
|
|
||||||
) {
|
|
||||||
if (!push.isBidirectional) {
|
|
||||||
logger.warn(
|
|
||||||
'Collaboration features are disabled because push is configured unidirectional. Use N8N_PUSH_BACKEND=websocket environment variable to enable them.',
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const isMultiMainSetup = config.get('multiMainSetup.enabled');
|
|
||||||
if (isMultiMainSetup) {
|
|
||||||
// TODO: We should support collaboration in multi-main setup as well
|
|
||||||
// This requires using redis as the state store instead of in-memory
|
|
||||||
logger.warn('Collaboration features are disabled because multi-main setup is enabled.');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.push.on('message', async (event: OnPushMessageEvent) => {
|
|
||||||
try {
|
|
||||||
await this.handleUserMessage(event.userId, event.msg);
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error('Error handling user message', {
|
|
||||||
error: error as unknown,
|
|
||||||
msg: event.msg,
|
|
||||||
userId: event.userId,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async handleUserMessage(userId: string, msg: unknown) {
|
|
||||||
if (isWorkflowOpenedMessage(msg)) {
|
|
||||||
await this.handleWorkflowOpened(userId, msg);
|
|
||||||
} else if (isWorkflowClosedMessage(msg)) {
|
|
||||||
await this.handleWorkflowClosed(userId, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async handleWorkflowOpened(userId: string, msg: WorkflowOpenedMessage) {
|
|
||||||
const { workflowId } = msg;
|
|
||||||
|
|
||||||
this.state.addActiveWorkflowUser(workflowId, userId);
|
|
||||||
this.state.cleanInactiveUsers(workflowId, INACTIVITY_CLEAN_UP_TIME_IN_MS);
|
|
||||||
|
|
||||||
await this.sendWorkflowUsersChangedMessage(workflowId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async handleWorkflowClosed(userId: string, msg: WorkflowClosedMessage) {
|
|
||||||
const { workflowId } = msg;
|
|
||||||
|
|
||||||
this.state.removeActiveWorkflowUser(workflowId, userId);
|
|
||||||
|
|
||||||
await this.sendWorkflowUsersChangedMessage(workflowId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
|
|
||||||
const activeWorkflowUsers = this.state.getActiveWorkflowUsers(workflowId);
|
|
||||||
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);
|
|
||||||
|
|
||||||
if (workflowUserIds.length === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const users = await this.userRepository.getByIds(
|
|
||||||
this.userService.getManager(),
|
|
||||||
workflowUserIds,
|
|
||||||
);
|
|
||||||
|
|
||||||
const msgData: IActiveWorkflowUsersChanged = {
|
|
||||||
workflowId,
|
|
||||||
activeUsers: users.map((user) => ({
|
|
||||||
user,
|
|
||||||
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!.lastSeen,
|
|
||||||
})),
|
|
||||||
};
|
|
||||||
|
|
||||||
this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,79 +0,0 @@
|
|||||||
import type { User } from '@db/entities/User';
|
|
||||||
import type { Workflow } from 'n8n-workflow';
|
|
||||||
import { Service } from 'typedi';
|
|
||||||
|
|
||||||
type ActiveWorkflowUser = {
|
|
||||||
userId: User['id'];
|
|
||||||
lastSeen: Date;
|
|
||||||
};
|
|
||||||
|
|
||||||
type UserStateByUserId = Map<User['id'], ActiveWorkflowUser>;
|
|
||||||
|
|
||||||
type State = {
|
|
||||||
activeUsersByWorkflowId: Map<Workflow['id'], UserStateByUserId>;
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* State management for the collaboration service
|
|
||||||
*/
|
|
||||||
@Service()
|
|
||||||
export class CollaborationState {
|
|
||||||
private state: State = {
|
|
||||||
activeUsersByWorkflowId: new Map(),
|
|
||||||
};
|
|
||||||
|
|
||||||
addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
|
|
||||||
const { activeUsersByWorkflowId } = this.state;
|
|
||||||
|
|
||||||
let activeUsers = activeUsersByWorkflowId.get(workflowId);
|
|
||||||
if (!activeUsers) {
|
|
||||||
activeUsers = new Map();
|
|
||||||
activeUsersByWorkflowId.set(workflowId, activeUsers);
|
|
||||||
}
|
|
||||||
|
|
||||||
activeUsers.set(userId, {
|
|
||||||
userId,
|
|
||||||
lastSeen: new Date(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
|
|
||||||
const { activeUsersByWorkflowId } = this.state;
|
|
||||||
|
|
||||||
const activeUsers = activeUsersByWorkflowId.get(workflowId);
|
|
||||||
if (!activeUsers) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
activeUsers.delete(userId);
|
|
||||||
if (activeUsers.size === 0) {
|
|
||||||
activeUsersByWorkflowId.delete(workflowId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
getActiveWorkflowUsers(workflowId: Workflow['id']): ActiveWorkflowUser[] {
|
|
||||||
const workflowState = this.state.activeUsersByWorkflowId.get(workflowId);
|
|
||||||
if (!workflowState) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
return [...workflowState.values()];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes all users that have not been seen in a given time
|
|
||||||
*/
|
|
||||||
cleanInactiveUsers(workflowId: Workflow['id'], inactivityCleanUpTimeInMs: number) {
|
|
||||||
const activeUsers = this.state.activeUsersByWorkflowId.get(workflowId);
|
|
||||||
if (!activeUsers) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const now = Date.now();
|
|
||||||
for (const user of activeUsers.values()) {
|
|
||||||
if (now - user.lastSeen.getTime() > inactivityCleanUpTimeInMs) {
|
|
||||||
activeUsers.delete(user.userId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,9 +1,6 @@
|
|||||||
import { EventEmitter } from 'events';
|
|
||||||
import { assert, jsonStringify } from 'n8n-workflow';
|
import { assert, jsonStringify } from 'n8n-workflow';
|
||||||
import type { IPushDataType } from '@/Interfaces';
|
import type { IPushDataType } from '@/Interfaces';
|
||||||
import type { Logger } from '@/Logger';
|
import type { Logger } from '@/Logger';
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import type { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract class for two-way push communication.
|
* Abstract class for two-way push communication.
|
||||||
@@ -11,23 +8,16 @@ import type { OrchestrationService } from '@/services/orchestration.service';
|
|||||||
*
|
*
|
||||||
* @emits message when a message is received from a client
|
* @emits message when a message is received from a client
|
||||||
*/
|
*/
|
||||||
export abstract class AbstractPush<T> extends EventEmitter {
|
export abstract class AbstractPush<T> {
|
||||||
protected connections: Record<string, T> = {};
|
protected connections: Record<string, T> = {};
|
||||||
|
|
||||||
protected userIdByPushRef: Record<string, string> = {};
|
|
||||||
|
|
||||||
protected abstract close(connection: T): void;
|
protected abstract close(connection: T): void;
|
||||||
protected abstract sendToOneConnection(connection: T, data: string): void;
|
protected abstract sendToOneConnection(connection: T, data: string): void;
|
||||||
|
|
||||||
constructor(
|
constructor(protected readonly logger: Logger) {}
|
||||||
protected readonly logger: Logger,
|
|
||||||
private readonly orchestrationService: OrchestrationService,
|
|
||||||
) {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected add(pushRef: string, userId: User['id'], connection: T) {
|
protected add(pushRef: string, connection: T) {
|
||||||
const { connections, userIdByPushRef } = this;
|
const { connections } = this;
|
||||||
this.logger.debug('Add editor-UI session', { pushRef });
|
this.logger.debug('Add editor-UI session', { pushRef });
|
||||||
|
|
||||||
const existingConnection = connections[pushRef];
|
const existingConnection = connections[pushRef];
|
||||||
@@ -38,15 +28,6 @@ export abstract class AbstractPush<T> extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
connections[pushRef] = connection;
|
connections[pushRef] = connection;
|
||||||
userIdByPushRef[pushRef] = userId;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected onMessageReceived(pushRef: string, msg: unknown) {
|
|
||||||
this.logger.debug('Received message from editor-UI', { pushRef, msg });
|
|
||||||
|
|
||||||
const userId = this.userIdByPushRef[pushRef];
|
|
||||||
|
|
||||||
this.emit('message', { pushRef, userId, msg });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected remove(pushRef?: string) {
|
protected remove(pushRef?: string) {
|
||||||
@@ -55,7 +36,6 @@ export abstract class AbstractPush<T> extends EventEmitter {
|
|||||||
this.logger.debug('Removed editor-UI session', { pushRef });
|
this.logger.debug('Removed editor-UI session', { pushRef });
|
||||||
|
|
||||||
delete this.connections[pushRef];
|
delete this.connections[pushRef];
|
||||||
delete this.userIdByPushRef[pushRef];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) {
|
private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) {
|
||||||
@@ -77,21 +57,7 @@ export abstract class AbstractPush<T> extends EventEmitter {
|
|||||||
this.sendTo(type, data, Object.keys(this.connections));
|
this.sendTo(type, data, Object.keys(this.connections));
|
||||||
}
|
}
|
||||||
|
|
||||||
sendToOneSession(type: IPushDataType, data: unknown, pushRef: string) {
|
sendToOne(type: IPushDataType, data: unknown, pushRef: string) {
|
||||||
/**
|
|
||||||
* Multi-main setup: In a manual webhook execution, the main process that
|
|
||||||
* handles a webhook might not be the same as the main process that created
|
|
||||||
* the webhook. If so, the handler process commands the creator process to
|
|
||||||
* relay the former's execution lifecycle events to the creator's frontend.
|
|
||||||
*/
|
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled && !this.hasPushRef(pushRef)) {
|
|
||||||
const payload = { type, args: data, pushRef };
|
|
||||||
|
|
||||||
void this.orchestrationService.publish('relay-execution-lifecycle-event', payload);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.connections[pushRef] === undefined) {
|
if (this.connections[pushRef] === undefined) {
|
||||||
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
|
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
|
||||||
return;
|
return;
|
||||||
@@ -100,15 +66,6 @@ export abstract class AbstractPush<T> extends EventEmitter {
|
|||||||
this.sendTo(type, data, [pushRef]);
|
this.sendTo(type, data, [pushRef]);
|
||||||
}
|
}
|
||||||
|
|
||||||
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
|
|
||||||
const { connections } = this;
|
|
||||||
const userPushRefs = Object.keys(connections).filter((pushRef) =>
|
|
||||||
userIds.includes(this.userIdByPushRef[pushRef]),
|
|
||||||
);
|
|
||||||
|
|
||||||
this.sendTo(type, data, userPushRefs);
|
|
||||||
}
|
|
||||||
|
|
||||||
closeAllConnections() {
|
closeAllConnections() {
|
||||||
for (const pushRef in this.connections) {
|
for (const pushRef in this.connections) {
|
||||||
// Signal the connection that we want to close it.
|
// Signal the connection that we want to close it.
|
||||||
|
|||||||
@@ -6,15 +6,17 @@ import type { Application } from 'express';
|
|||||||
import { Server as WSServer } from 'ws';
|
import { Server as WSServer } from 'ws';
|
||||||
import { parse as parseUrl } from 'url';
|
import { parse as parseUrl } from 'url';
|
||||||
import { Container, Service } from 'typedi';
|
import { Container, Service } from 'typedi';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { SSEPush } from './sse.push';
|
|
||||||
import { WebSocketPush } from './websocket.push';
|
|
||||||
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
|
|
||||||
import type { IPushDataType } from '@/Interfaces';
|
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import { OnShutdown } from '@/decorators/OnShutdown';
|
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||||
import { AuthService } from '@/auth/auth.service';
|
import { AuthService } from '@/auth/auth.service';
|
||||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||||
|
import type { IPushDataType } from '@/Interfaces';
|
||||||
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
|
|
||||||
|
import { SSEPush } from './sse.push';
|
||||||
|
import { WebSocketPush } from './websocket.push';
|
||||||
|
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
|
||||||
|
|
||||||
const useWebSockets = config.getEnv('push.backend') === 'websocket';
|
const useWebSockets = config.getEnv('push.backend') === 'websocket';
|
||||||
|
|
||||||
@@ -27,14 +29,10 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket';
|
|||||||
*/
|
*/
|
||||||
@Service()
|
@Service()
|
||||||
export class Push extends EventEmitter {
|
export class Push extends EventEmitter {
|
||||||
public isBidirectional = useWebSockets;
|
|
||||||
|
|
||||||
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
||||||
|
|
||||||
constructor() {
|
constructor(private readonly orchestrationService: OrchestrationService) {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
|
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
|
||||||
@@ -54,9 +52,9 @@ export class Push extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (req.ws) {
|
if (req.ws) {
|
||||||
(this.backend as WebSocketPush).add(pushRef, user.id, req.ws);
|
(this.backend as WebSocketPush).add(pushRef, req.ws);
|
||||||
} else if (!useWebSockets) {
|
} else if (!useWebSockets) {
|
||||||
(this.backend as SSEPush).add(pushRef, user.id, { req, res });
|
(this.backend as SSEPush).add(pushRef, { req, res });
|
||||||
} else {
|
} else {
|
||||||
res.status(401).send('Unauthorized');
|
res.status(401).send('Unauthorized');
|
||||||
return;
|
return;
|
||||||
@@ -70,17 +68,25 @@ export class Push extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
send(type: IPushDataType, data: unknown, pushRef: string) {
|
send(type: IPushDataType, data: unknown, pushRef: string) {
|
||||||
this.backend.sendToOneSession(type, data, pushRef);
|
/**
|
||||||
|
* Multi-main setup: In a manual webhook execution, the main process that
|
||||||
|
* handles a webhook might not be the same as the main process that created
|
||||||
|
* the webhook. If so, the handler process commands the creator process to
|
||||||
|
* relay the former's execution lifecycle events to the creator's frontend.
|
||||||
|
*/
|
||||||
|
if (this.orchestrationService.isMultiMainSetupEnabled && !this.backend.hasPushRef(pushRef)) {
|
||||||
|
const payload = { type, args: data, pushRef };
|
||||||
|
void this.orchestrationService.publish('relay-execution-lifecycle-event', payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.backend.sendToOne(type, data, pushRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
getBackend() {
|
getBackend() {
|
||||||
return this.backend;
|
return this.backend;
|
||||||
}
|
}
|
||||||
|
|
||||||
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
|
|
||||||
this.backend.sendToUsers(type, data, userIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnShutdown()
|
@OnShutdown()
|
||||||
onShutdown() {
|
onShutdown() {
|
||||||
this.backend.closeAllConnections();
|
this.backend.closeAllConnections();
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
import SSEChannel from 'sse-channel';
|
import SSEChannel from 'sse-channel';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
|
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
|
||||||
import { AbstractPush } from './abstract.push';
|
import { AbstractPush } from './abstract.push';
|
||||||
import type { PushRequest, PushResponse } from './types';
|
import type { PushRequest, PushResponse } from './types';
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
type Connection = { req: PushRequest; res: PushResponse };
|
type Connection = { req: PushRequest; res: PushResponse };
|
||||||
|
|
||||||
@@ -14,16 +14,16 @@ export class SSEPush extends AbstractPush<Connection> {
|
|||||||
|
|
||||||
readonly connections: Record<string, Connection> = {};
|
readonly connections: Record<string, Connection> = {};
|
||||||
|
|
||||||
constructor(logger: Logger, orchestrationService: OrchestrationService) {
|
constructor(logger: Logger) {
|
||||||
super(logger, orchestrationService);
|
super(logger);
|
||||||
|
|
||||||
this.channel.on('disconnect', (_, { req }) => {
|
this.channel.on('disconnect', (_, { req }) => {
|
||||||
this.remove(req?.query?.pushRef);
|
this.remove(req?.query?.pushRef);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
add(pushRef: string, userId: User['id'], connection: Connection) {
|
add(pushRef: string, connection: Connection) {
|
||||||
super.add(pushRef, userId, connection);
|
super.add(pushRef, connection);
|
||||||
this.channel.addClient(connection.req, connection.res);
|
this.channel.addClient(connection.req, connection.res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import type { Response } from 'express';
|
import type { Response } from 'express';
|
||||||
import type { WebSocket } from 'ws';
|
import type { WebSocket } from 'ws';
|
||||||
|
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import type { AuthenticatedRequest } from '@/requests';
|
import type { AuthenticatedRequest } from '@/requests';
|
||||||
|
|
||||||
// TODO: move all push related types here
|
// TODO: move all push related types here
|
||||||
@@ -12,9 +11,3 @@ export type SSEPushRequest = PushRequest & { ws: undefined };
|
|||||||
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
|
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
|
||||||
|
|
||||||
export type PushResponse = Response & { req: PushRequest };
|
export type PushResponse = Response & { req: PushRequest };
|
||||||
|
|
||||||
export type OnPushMessageEvent = {
|
|
||||||
pushRef: string;
|
|
||||||
userId: User['id'];
|
|
||||||
msg: unknown;
|
|
||||||
};
|
|
||||||
|
|||||||
@@ -2,8 +2,6 @@ import type WebSocket from 'ws';
|
|||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import { AbstractPush } from './abstract.push';
|
import { AbstractPush } from './abstract.push';
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
function heartbeat(this: WebSocket) {
|
function heartbeat(this: WebSocket) {
|
||||||
this.isAlive = true;
|
this.isAlive = true;
|
||||||
@@ -11,41 +9,24 @@ function heartbeat(this: WebSocket) {
|
|||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class WebSocketPush extends AbstractPush<WebSocket> {
|
export class WebSocketPush extends AbstractPush<WebSocket> {
|
||||||
constructor(logger: Logger, orchestrationService: OrchestrationService) {
|
constructor(logger: Logger) {
|
||||||
super(logger, orchestrationService);
|
super(logger);
|
||||||
|
|
||||||
// Ping all connected clients every 60 seconds
|
// Ping all connected clients every 60 seconds
|
||||||
setInterval(() => this.pingAll(), 60 * 1000);
|
setInterval(() => this.pingAll(), 60 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
add(pushRef: string, userId: User['id'], connection: WebSocket) {
|
add(pushRef: string, connection: WebSocket) {
|
||||||
connection.isAlive = true;
|
connection.isAlive = true;
|
||||||
connection.on('pong', heartbeat);
|
connection.on('pong', heartbeat);
|
||||||
|
|
||||||
super.add(pushRef, userId, connection);
|
super.add(pushRef, connection);
|
||||||
|
|
||||||
const onMessage = (data: WebSocket.RawData) => {
|
|
||||||
try {
|
|
||||||
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
|
|
||||||
|
|
||||||
this.onMessageReceived(pushRef, JSON.parse(buffer.toString('utf8')));
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error("Couldn't parse message from editor-UI", {
|
|
||||||
error: error as unknown,
|
|
||||||
pushRef,
|
|
||||||
data,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Makes sure to remove the session if the connection is closed
|
// Makes sure to remove the session if the connection is closed
|
||||||
connection.once('close', () => {
|
connection.once('close', () => {
|
||||||
connection.off('pong', heartbeat);
|
connection.off('pong', heartbeat);
|
||||||
connection.off('message', onMessage);
|
|
||||||
this.remove(pushRef);
|
this.remove(pushRef);
|
||||||
});
|
});
|
||||||
|
|
||||||
connection.on('message', onMessage);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected close(connection: WebSocket): void {
|
protected close(connection: WebSocket): void {
|
||||||
|
|||||||
@@ -1,151 +0,0 @@
|
|||||||
import { CollaborationService } from '@/collaboration/collaboration.service';
|
|
||||||
import type { Logger } from '@/Logger';
|
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import type { UserService } from '@/services/user.service';
|
|
||||||
import { CollaborationState } from '@/collaboration/collaboration.state';
|
|
||||||
import type { Push } from '@/push';
|
|
||||||
import type {
|
|
||||||
WorkflowClosedMessage,
|
|
||||||
WorkflowOpenedMessage,
|
|
||||||
} from '@/collaboration/collaboration.message';
|
|
||||||
import type { UserRepository } from '@/databases/repositories/user.repository';
|
|
||||||
import { mock } from 'jest-mock-extended';
|
|
||||||
|
|
||||||
describe('CollaborationService', () => {
|
|
||||||
let collaborationService: CollaborationService;
|
|
||||||
let mockLogger: Logger;
|
|
||||||
let mockUserService: jest.Mocked<UserService>;
|
|
||||||
let mockUserRepository: jest.Mocked<UserRepository>;
|
|
||||||
let state: CollaborationState;
|
|
||||||
let push: Push;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
mockLogger = mock<Logger>();
|
|
||||||
mockUserService = mock<UserService>();
|
|
||||||
mockUserRepository = mock<UserRepository>();
|
|
||||||
push = mock<Push>();
|
|
||||||
state = new CollaborationState();
|
|
||||||
|
|
||||||
collaborationService = new CollaborationService(
|
|
||||||
mockLogger,
|
|
||||||
push,
|
|
||||||
state,
|
|
||||||
mockUserService,
|
|
||||||
mockUserRepository,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('workflow opened message', () => {
|
|
||||||
const userId = 'test-user';
|
|
||||||
const workflowId = 'test-workflow';
|
|
||||||
|
|
||||||
const message: WorkflowOpenedMessage = {
|
|
||||||
type: 'workflowOpened',
|
|
||||||
workflowId,
|
|
||||||
};
|
|
||||||
|
|
||||||
const expectActiveUsersChangedMessage = () => {
|
|
||||||
expect(push.sendToUsers).toHaveBeenCalledWith(
|
|
||||||
'activeWorkflowUsersChanged',
|
|
||||||
{
|
|
||||||
workflowId,
|
|
||||||
activeUsers: [
|
|
||||||
{
|
|
||||||
user: { id: userId },
|
|
||||||
lastSeen: expect.any(Date),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
},
|
|
||||||
[userId],
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
describe('user is not yet active', () => {
|
|
||||||
it('updates state correctly', async () => {
|
|
||||||
mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([
|
|
||||||
{
|
|
||||||
lastSeen: expect.any(Date),
|
|
||||||
userId,
|
|
||||||
},
|
|
||||||
]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('sends active workflow users changed message', async () => {
|
|
||||||
mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expectActiveUsersChangedMessage();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('user is already active', () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
state.addActiveWorkflowUser(workflowId, userId);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('updates state correctly', async () => {
|
|
||||||
mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([
|
|
||||||
{
|
|
||||||
lastSeen: expect.any(Date),
|
|
||||||
userId,
|
|
||||||
},
|
|
||||||
]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('sends active workflow users changed message', async () => {
|
|
||||||
mockUserRepository.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expectActiveUsersChangedMessage();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('workflow closed message', () => {
|
|
||||||
const userId = 'test-user';
|
|
||||||
const workflowId = 'test-workflow';
|
|
||||||
|
|
||||||
const message: WorkflowClosedMessage = {
|
|
||||||
type: 'workflowClosed',
|
|
||||||
workflowId,
|
|
||||||
};
|
|
||||||
|
|
||||||
describe('user is active', () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
state.addActiveWorkflowUser(workflowId, userId);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('updates state correctly', async () => {
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('does not send active workflow users changed message', async () => {
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expect(push.sendToUsers).not.toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('user is not active', () => {
|
|
||||||
it('updates state correctly', async () => {
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('does not send active workflow users changed message', async () => {
|
|
||||||
await collaborationService.handleUserMessage(userId, message);
|
|
||||||
|
|
||||||
expect(push.sendToUsers).not.toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
import { TIME } from '@/constants';
|
|
||||||
import { CollaborationState } from '@/collaboration/collaboration.state';
|
|
||||||
|
|
||||||
const origDate = global.Date;
|
|
||||||
|
|
||||||
const mockDateFactory = (currentDate: string) => {
|
|
||||||
return class CustomDate extends origDate {
|
|
||||||
constructor() {
|
|
||||||
super(currentDate);
|
|
||||||
}
|
|
||||||
} as DateConstructor;
|
|
||||||
};
|
|
||||||
|
|
||||||
describe('CollaborationState', () => {
|
|
||||||
let collaborationState: CollaborationState;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
collaborationState = new CollaborationState();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('cleanInactiveUsers', () => {
|
|
||||||
const workflowId = 'workflow';
|
|
||||||
|
|
||||||
it('should remove inactive users', () => {
|
|
||||||
// Setup
|
|
||||||
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');
|
|
||||||
collaborationState.addActiveWorkflowUser(workflowId, 'inactiveUser');
|
|
||||||
|
|
||||||
global.Date = mockDateFactory('2023-01-01T00:30:00.000Z');
|
|
||||||
collaborationState.addActiveWorkflowUser(workflowId, 'activeUser');
|
|
||||||
|
|
||||||
// Act: Clean inactive users
|
|
||||||
jest
|
|
||||||
.spyOn(global.Date, 'now')
|
|
||||||
.mockReturnValue(new origDate('2023-01-01T00:35:00.000Z').getTime());
|
|
||||||
collaborationState.cleanInactiveUsers(workflowId, 10 * TIME.MINUTE);
|
|
||||||
|
|
||||||
// Assert: The inactive user should be removed
|
|
||||||
expect(collaborationState.getActiveWorkflowUsers(workflowId)).toEqual([
|
|
||||||
{ userId: 'activeUser', lastSeen: new origDate('2023-01-01T00:30:00.000Z') },
|
|
||||||
]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should not remove active users', () => {
|
|
||||||
// Setup: Add an active user to the state
|
|
||||||
global.Date = mockDateFactory('2023-01-01T00:30:00.000Z');
|
|
||||||
collaborationState.addActiveWorkflowUser(workflowId, 'activeUser');
|
|
||||||
|
|
||||||
// Act: Clean inactive users
|
|
||||||
jest
|
|
||||||
.spyOn(global.Date, 'now')
|
|
||||||
.mockReturnValue(new origDate('2023-01-01T00:35:00.000Z').getTime());
|
|
||||||
collaborationState.cleanInactiveUsers(workflowId, 10 * TIME.MINUTE);
|
|
||||||
|
|
||||||
// Assert: The active user should still be present
|
|
||||||
expect(collaborationState.getActiveWorkflowUsers(workflowId)).toEqual([
|
|
||||||
{ userId: 'activeUser', lastSeen: new origDate('2023-01-01T00:30:00.000Z') },
|
|
||||||
]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -1,14 +1,16 @@
|
|||||||
import type { WebSocket } from 'ws';
|
import type { WebSocket } from 'ws';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import type { User } from '@db/entities/User';
|
import type { User } from '@db/entities/User';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { SSEPush } from '@/push/sse.push';
|
import { SSEPush } from '@/push/sse.push';
|
||||||
import { WebSocketPush } from '@/push/websocket.push';
|
import { WebSocketPush } from '@/push/websocket.push';
|
||||||
import type { WebSocketPushRequest, SSEPushRequest } from '@/push/types';
|
import type { WebSocketPushRequest, SSEPushRequest } from '@/push/types';
|
||||||
import { mockInstance } from '../../shared/mocking';
|
|
||||||
import { mock } from 'jest-mock-extended';
|
|
||||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||||
|
|
||||||
|
import { mockInstance } from '../../shared/mocking';
|
||||||
|
|
||||||
jest.unmock('@/push');
|
jest.unmock('@/push');
|
||||||
|
|
||||||
describe('Push', () => {
|
describe('Push', () => {
|
||||||
@@ -19,7 +21,7 @@ describe('Push', () => {
|
|||||||
|
|
||||||
test('should validate pushRef on requests for websocket backend', () => {
|
test('should validate pushRef on requests for websocket backend', () => {
|
||||||
config.set('push.backend', 'websocket');
|
config.set('push.backend', 'websocket');
|
||||||
const push = new Push();
|
const push = new Push(mock());
|
||||||
const ws = mock<WebSocket>();
|
const ws = mock<WebSocket>();
|
||||||
const request = mock<WebSocketPushRequest>({ user, ws });
|
const request = mock<WebSocketPushRequest>({ user, ws });
|
||||||
request.query = { pushRef: '' };
|
request.query = { pushRef: '' };
|
||||||
@@ -32,7 +34,7 @@ describe('Push', () => {
|
|||||||
|
|
||||||
test('should validate pushRef on requests for SSE backend', () => {
|
test('should validate pushRef on requests for SSE backend', () => {
|
||||||
config.set('push.backend', 'sse');
|
config.set('push.backend', 'sse');
|
||||||
const push = new Push();
|
const push = new Push(mock());
|
||||||
const request = mock<SSEPushRequest>({ user, ws: undefined });
|
const request = mock<SSEPushRequest>({ user, ws: undefined });
|
||||||
request.query = { pushRef: '' };
|
request.query = { pushRef: '' };
|
||||||
expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError);
|
expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError);
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
import Container from 'typedi';
|
import { Container } from 'typedi';
|
||||||
import { EventEmitter } from 'events';
|
import { EventEmitter } from 'events';
|
||||||
import type WebSocket from 'ws';
|
import type WebSocket from 'ws';
|
||||||
|
|
||||||
import { WebSocketPush } from '@/push/websocket.push';
|
import { WebSocketPush } from '@/push/websocket.push';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import type { User } from '@db/entities/User';
|
|
||||||
import type { PushDataExecutionRecovered } from '@/Interfaces';
|
import type { PushDataExecutionRecovered } from '@/Interfaces';
|
||||||
|
|
||||||
import { mockInstance } from '../../shared/mocking';
|
import { mockInstance } from '../../shared/mocking';
|
||||||
|
|
||||||
jest.useFakeTimers();
|
jest.useFakeTimers();
|
||||||
@@ -26,7 +27,6 @@ const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked<
|
|||||||
describe('WebSocketPush', () => {
|
describe('WebSocketPush', () => {
|
||||||
const pushRef1 = 'test-session1';
|
const pushRef1 = 'test-session1';
|
||||||
const pushRef2 = 'test-session2';
|
const pushRef2 = 'test-session2';
|
||||||
const userId: User['id'] = 'test-user';
|
|
||||||
|
|
||||||
mockInstance(Logger);
|
mockInstance(Logger);
|
||||||
const webSocketPush = Container.get(WebSocketPush);
|
const webSocketPush = Container.get(WebSocketPush);
|
||||||
@@ -38,26 +38,24 @@ describe('WebSocketPush', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('can add a connection', () => {
|
it('can add a connection', () => {
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||||
|
|
||||||
expect(mockWebSocket1.listenerCount('close')).toBe(1);
|
expect(mockWebSocket1.listenerCount('close')).toBe(1);
|
||||||
expect(mockWebSocket1.listenerCount('pong')).toBe(1);
|
expect(mockWebSocket1.listenerCount('pong')).toBe(1);
|
||||||
expect(mockWebSocket1.listenerCount('message')).toBe(1);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('closes a connection', () => {
|
it('closes a connection', () => {
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||||
|
|
||||||
mockWebSocket1.emit('close');
|
mockWebSocket1.emit('close');
|
||||||
|
|
||||||
expect(mockWebSocket1.listenerCount('close')).toBe(0);
|
expect(mockWebSocket1.listenerCount('close')).toBe(0);
|
||||||
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
|
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
|
||||||
expect(mockWebSocket1.listenerCount('message')).toBe(0);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('sends data to one connection', () => {
|
it('sends data to one connection', () => {
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
webSocketPush.add(pushRef2, mockWebSocket2);
|
||||||
const data: PushDataExecutionRecovered = {
|
const data: PushDataExecutionRecovered = {
|
||||||
type: 'executionRecovered',
|
type: 'executionRecovered',
|
||||||
data: {
|
data: {
|
||||||
@@ -65,7 +63,7 @@ describe('WebSocketPush', () => {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
webSocketPush.sendToOneSession('executionRecovered', data, pushRef1);
|
webSocketPush.sendToOne('executionRecovered', data, pushRef1);
|
||||||
|
|
||||||
expect(mockWebSocket1.send).toHaveBeenCalledWith(
|
expect(mockWebSocket1.send).toHaveBeenCalledWith(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
@@ -82,8 +80,8 @@ describe('WebSocketPush', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('sends data to all connections', () => {
|
it('sends data to all connections', () => {
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
webSocketPush.add(pushRef2, mockWebSocket2);
|
||||||
const data: PushDataExecutionRecovered = {
|
const data: PushDataExecutionRecovered = {
|
||||||
type: 'executionRecovered',
|
type: 'executionRecovered',
|
||||||
data: {
|
data: {
|
||||||
@@ -106,56 +104,13 @@ describe('WebSocketPush', () => {
|
|||||||
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
|
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('sends data to all users connections', () => {
|
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
|
||||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
|
||||||
const data: PushDataExecutionRecovered = {
|
|
||||||
type: 'executionRecovered',
|
|
||||||
data: {
|
|
||||||
executionId: 'test-execution-id',
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
webSocketPush.sendToUsers('executionRecovered', data, [userId]);
|
|
||||||
|
|
||||||
const expectedMsg = JSON.stringify({
|
|
||||||
type: 'executionRecovered',
|
|
||||||
data: {
|
|
||||||
type: 'executionRecovered',
|
|
||||||
data: {
|
|
||||||
executionId: 'test-execution-id',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
|
|
||||||
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('pings all connections', () => {
|
it('pings all connections', () => {
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
webSocketPush.add(pushRef2, mockWebSocket2);
|
||||||
|
|
||||||
jest.runOnlyPendingTimers();
|
jest.runOnlyPendingTimers();
|
||||||
|
|
||||||
expect(mockWebSocket1.ping).toHaveBeenCalled();
|
expect(mockWebSocket1.ping).toHaveBeenCalled();
|
||||||
expect(mockWebSocket2.ping).toHaveBeenCalled();
|
expect(mockWebSocket2.ping).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('emits message event when connection receives data', () => {
|
|
||||||
const mockOnMessageReceived = jest.fn();
|
|
||||||
webSocketPush.on('message', mockOnMessageReceived);
|
|
||||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
|
||||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
|
||||||
|
|
||||||
const data = { test: 'data' };
|
|
||||||
const buffer = Buffer.from(JSON.stringify(data));
|
|
||||||
|
|
||||||
mockWebSocket1.emit('message', buffer);
|
|
||||||
|
|
||||||
expect(mockOnMessageReceived).toHaveBeenCalledWith({
|
|
||||||
msg: data,
|
|
||||||
pushRef: pushRef1,
|
|
||||||
userId,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -432,16 +432,6 @@ export interface IExecutionDeleteFilter {
|
|||||||
ids?: string[];
|
ids?: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
export type PushDataUsersForWorkflow = {
|
|
||||||
workflowId: string;
|
|
||||||
activeUsers: Array<{ user: IUser; lastSeen: string }>;
|
|
||||||
};
|
|
||||||
|
|
||||||
type PushDataWorkflowUsersChanged = {
|
|
||||||
data: PushDataUsersForWorkflow;
|
|
||||||
type: 'activeWorkflowUsersChanged';
|
|
||||||
};
|
|
||||||
|
|
||||||
export type IPushData =
|
export type IPushData =
|
||||||
| PushDataExecutionFinished
|
| PushDataExecutionFinished
|
||||||
| PushDataExecutionStarted
|
| PushDataExecutionStarted
|
||||||
@@ -456,8 +446,7 @@ export type IPushData =
|
|||||||
| PushDataWorkerStatusMessage
|
| PushDataWorkerStatusMessage
|
||||||
| PushDataActiveWorkflowAdded
|
| PushDataActiveWorkflowAdded
|
||||||
| PushDataActiveWorkflowRemoved
|
| PushDataActiveWorkflowRemoved
|
||||||
| PushDataWorkflowFailedToActivate
|
| PushDataWorkflowFailedToActivate;
|
||||||
| PushDataWorkflowUsersChanged;
|
|
||||||
|
|
||||||
export type PushDataActiveWorkflowAdded = {
|
export type PushDataActiveWorkflowAdded = {
|
||||||
data: IActiveWorkflowAdded;
|
data: IActiveWorkflowAdded;
|
||||||
|
|||||||
@@ -1,82 +0,0 @@
|
|||||||
<script setup lang="ts">
|
|
||||||
import { useUsersStore } from '@/stores/users.store';
|
|
||||||
import { useWorkflowsStore } from '@/stores/workflows.store';
|
|
||||||
import { useCollaborationStore } from '@/stores/collaboration.store';
|
|
||||||
import { onBeforeUnmount, onMounted, computed, ref } from 'vue';
|
|
||||||
import { TIME } from '@/constants';
|
|
||||||
import { isUserGlobalOwner } from '@/utils/userUtils';
|
|
||||||
|
|
||||||
const collaborationStore = useCollaborationStore();
|
|
||||||
const usersStore = useUsersStore();
|
|
||||||
const workflowsStore = useWorkflowsStore();
|
|
||||||
|
|
||||||
const HEARTBEAT_INTERVAL = 5 * TIME.MINUTE;
|
|
||||||
const heartbeatTimer = ref<number | null>(null);
|
|
||||||
|
|
||||||
const activeUsersSorted = computed(() => {
|
|
||||||
const currentWorkflowUsers = (collaborationStore.getUsersForCurrentWorkflow ?? []).map(
|
|
||||||
(userInfo) => userInfo.user,
|
|
||||||
);
|
|
||||||
const owner = currentWorkflowUsers.find(isUserGlobalOwner);
|
|
||||||
return {
|
|
||||||
defaultGroup: owner
|
|
||||||
? [owner, ...currentWorkflowUsers.filter((user) => user.id !== owner.id)]
|
|
||||||
: currentWorkflowUsers,
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
const currentUserEmail = computed(() => {
|
|
||||||
return usersStore.currentUser?.email;
|
|
||||||
});
|
|
||||||
|
|
||||||
const startHeartbeat = () => {
|
|
||||||
if (heartbeatTimer.value !== null) {
|
|
||||||
clearInterval(heartbeatTimer.value);
|
|
||||||
heartbeatTimer.value = null;
|
|
||||||
}
|
|
||||||
heartbeatTimer.value = window.setInterval(() => {
|
|
||||||
collaborationStore.notifyWorkflowOpened(workflowsStore.workflow.id);
|
|
||||||
}, HEARTBEAT_INTERVAL);
|
|
||||||
};
|
|
||||||
|
|
||||||
const stopHeartbeat = () => {
|
|
||||||
if (heartbeatTimer.value !== null) {
|
|
||||||
clearInterval(heartbeatTimer.value);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const onDocumentVisibilityChange = () => {
|
|
||||||
if (document.visibilityState === 'hidden') {
|
|
||||||
stopHeartbeat();
|
|
||||||
} else {
|
|
||||||
startHeartbeat();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
onMounted(() => {
|
|
||||||
collaborationStore.initialize();
|
|
||||||
startHeartbeat();
|
|
||||||
document.addEventListener('visibilitychange', onDocumentVisibilityChange);
|
|
||||||
});
|
|
||||||
|
|
||||||
onBeforeUnmount(() => {
|
|
||||||
document.removeEventListener('visibilitychange', onDocumentVisibilityChange);
|
|
||||||
stopHeartbeat();
|
|
||||||
collaborationStore.terminate();
|
|
||||||
});
|
|
||||||
</script>
|
|
||||||
|
|
||||||
<template>
|
|
||||||
<div
|
|
||||||
:class="`collaboration-pane-container ${$style.container}`"
|
|
||||||
data-test-id="collaboration-pane"
|
|
||||||
>
|
|
||||||
<n8n-user-stack :users="activeUsersSorted" :current-user-email="currentUserEmail" />
|
|
||||||
</div>
|
|
||||||
</template>
|
|
||||||
|
|
||||||
<style lang="scss" module>
|
|
||||||
.container {
|
|
||||||
margin: 0 var(--spacing-4xs);
|
|
||||||
}
|
|
||||||
</style>
|
|
||||||
@@ -22,7 +22,6 @@ import SaveButton from '@/components/SaveButton.vue';
|
|||||||
import TagsDropdown from '@/components/TagsDropdown.vue';
|
import TagsDropdown from '@/components/TagsDropdown.vue';
|
||||||
import InlineTextEdit from '@/components/InlineTextEdit.vue';
|
import InlineTextEdit from '@/components/InlineTextEdit.vue';
|
||||||
import BreakpointsObserver from '@/components/BreakpointsObserver.vue';
|
import BreakpointsObserver from '@/components/BreakpointsObserver.vue';
|
||||||
import CollaborationPane from '@/components/MainHeader/CollaborationPane.vue';
|
|
||||||
|
|
||||||
import { useRootStore } from '@/stores/root.store';
|
import { useRootStore } from '@/stores/root.store';
|
||||||
import { useSettingsStore } from '@/stores/settings.store';
|
import { useSettingsStore } from '@/stores/settings.store';
|
||||||
@@ -622,7 +621,6 @@ function showCreateWorkflowSuccessToast(id?: string) {
|
|||||||
</span>
|
</span>
|
||||||
<EnterpriseEdition :features="[EnterpriseEditionFeature.Sharing]">
|
<EnterpriseEdition :features="[EnterpriseEditionFeature.Sharing]">
|
||||||
<div :class="$style.group">
|
<div :class="$style.group">
|
||||||
<CollaborationPane />
|
|
||||||
<N8nButton
|
<N8nButton
|
||||||
type="secondary"
|
type="secondary"
|
||||||
data-test-id="workflow-share-button"
|
data-test-id="workflow-share-button"
|
||||||
|
|||||||
@@ -1,110 +0,0 @@
|
|||||||
import { merge } from 'lodash-es';
|
|
||||||
import { SETTINGS_STORE_DEFAULT_STATE, waitAllPromises } from '@/__tests__/utils';
|
|
||||||
import { ROLE, STORES } from '@/constants';
|
|
||||||
import { createTestingPinia } from '@pinia/testing';
|
|
||||||
import CollaborationPane from '@/components//MainHeader/CollaborationPane.vue';
|
|
||||||
import type { RenderOptions } from '@/__tests__/render';
|
|
||||||
import { createComponentRenderer } from '@/__tests__/render';
|
|
||||||
|
|
||||||
const OWNER_USER = {
|
|
||||||
createdAt: '2023-11-22T10:17:12.246Z',
|
|
||||||
id: 'aaaaaa',
|
|
||||||
email: 'owner@user.com',
|
|
||||||
firstName: 'Owner',
|
|
||||||
lastName: 'User',
|
|
||||||
role: ROLE.Owner,
|
|
||||||
disabled: false,
|
|
||||||
isPending: false,
|
|
||||||
fullName: 'Owner User',
|
|
||||||
};
|
|
||||||
|
|
||||||
const MEMBER_USER = {
|
|
||||||
createdAt: '2023-11-22T10:17:12.246Z',
|
|
||||||
id: 'aaabbb',
|
|
||||||
email: 'member@user.com',
|
|
||||||
firstName: 'Member',
|
|
||||||
lastName: 'User',
|
|
||||||
role: ROLE.Member,
|
|
||||||
disabled: false,
|
|
||||||
isPending: false,
|
|
||||||
fullName: 'Member User',
|
|
||||||
};
|
|
||||||
|
|
||||||
const MEMBER_USER_2 = {
|
|
||||||
createdAt: '2023-11-22T10:17:12.246Z',
|
|
||||||
id: 'aaaccc',
|
|
||||||
email: 'member2@user.com',
|
|
||||||
firstName: 'Another Member',
|
|
||||||
lastName: 'User',
|
|
||||||
role: ROLE.Member,
|
|
||||||
disabled: false,
|
|
||||||
isPending: false,
|
|
||||||
fullName: 'Another Member User',
|
|
||||||
};
|
|
||||||
|
|
||||||
const initialState = {
|
|
||||||
[STORES.SETTINGS]: {
|
|
||||||
settings: merge({}, SETTINGS_STORE_DEFAULT_STATE.settings),
|
|
||||||
},
|
|
||||||
[STORES.WORKFLOWS]: {
|
|
||||||
workflow: {
|
|
||||||
id: 'w1',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
[STORES.USERS]: {
|
|
||||||
currentUserId: 'aaaaaa',
|
|
||||||
users: {
|
|
||||||
aaaaaa: OWNER_USER,
|
|
||||||
aaabbb: MEMBER_USER,
|
|
||||||
aaaccc: MEMBER_USER_2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
[STORES.COLLABORATION]: {
|
|
||||||
usersForWorkflows: {
|
|
||||||
w1: [
|
|
||||||
{ lastSeen: '2023-11-22T10:17:12.246Z', user: MEMBER_USER },
|
|
||||||
{ lastSeen: '2023-11-22T10:17:12.246Z', user: OWNER_USER },
|
|
||||||
],
|
|
||||||
w2: [{ lastSeen: '2023-11-22T10:17:12.246Z', user: MEMBER_USER_2 }],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const defaultRenderOptions: RenderOptions = {
|
|
||||||
pinia: createTestingPinia({ initialState }),
|
|
||||||
};
|
|
||||||
|
|
||||||
const renderComponent = createComponentRenderer(CollaborationPane, defaultRenderOptions);
|
|
||||||
|
|
||||||
describe('CollaborationPane', () => {
|
|
||||||
afterEach(() => {
|
|
||||||
vi.clearAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should show only current workflow users', async () => {
|
|
||||||
const { getByTestId, queryByTestId } = renderComponent();
|
|
||||||
await waitAllPromises();
|
|
||||||
|
|
||||||
expect(getByTestId('collaboration-pane')).toBeInTheDocument();
|
|
||||||
expect(getByTestId('user-stack-avatars')).toBeInTheDocument();
|
|
||||||
expect(getByTestId(`user-stack-avatar-${OWNER_USER.id}`)).toBeInTheDocument();
|
|
||||||
expect(getByTestId(`user-stack-avatar-${MEMBER_USER.id}`)).toBeInTheDocument();
|
|
||||||
expect(queryByTestId(`user-stack-avatar-${MEMBER_USER_2.id}`)).toBeNull();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should render current user correctly', async () => {
|
|
||||||
const { getByText, queryByText } = renderComponent();
|
|
||||||
await waitAllPromises();
|
|
||||||
expect(getByText(`${OWNER_USER.fullName} (you)`)).toBeInTheDocument();
|
|
||||||
expect(queryByText(`${MEMBER_USER.fullName} (you)`)).toBeNull();
|
|
||||||
expect(queryByText(`${MEMBER_USER.fullName}`)).toBeInTheDocument();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should always render owner first in the list', async () => {
|
|
||||||
const { getByTestId } = renderComponent();
|
|
||||||
await waitAllPromises();
|
|
||||||
const firstAvatar = getByTestId('user-stack-avatars').querySelector('.n8n-avatar');
|
|
||||||
// Owner is second in the store bur shourld be rendered first
|
|
||||||
expect(firstAvatar).toHaveAttribute('data-test-id', `user-stack-avatar-${OWNER_USER.id}`);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -2,7 +2,6 @@ import { usePushConnection } from '@/composables/usePushConnection';
|
|||||||
import { useRouter } from 'vue-router';
|
import { useRouter } from 'vue-router';
|
||||||
import { createPinia, setActivePinia } from 'pinia';
|
import { createPinia, setActivePinia } from 'pinia';
|
||||||
import { usePushConnectionStore } from '@/stores/pushConnection.store';
|
import { usePushConnectionStore } from '@/stores/pushConnection.store';
|
||||||
import { useCollaborationStore } from '@/stores/collaboration.store';
|
|
||||||
import type { IPushData, PushDataWorkerStatusMessage } from '@/Interface';
|
import type { IPushData, PushDataWorkerStatusMessage } from '@/Interface';
|
||||||
import { useOrchestrationStore } from '@/stores/orchestration.store';
|
import { useOrchestrationStore } from '@/stores/orchestration.store';
|
||||||
|
|
||||||
@@ -21,7 +20,6 @@ vi.useFakeTimers();
|
|||||||
describe('usePushConnection()', () => {
|
describe('usePushConnection()', () => {
|
||||||
let router: ReturnType<typeof useRouter>;
|
let router: ReturnType<typeof useRouter>;
|
||||||
let pushStore: ReturnType<typeof usePushConnectionStore>;
|
let pushStore: ReturnType<typeof usePushConnectionStore>;
|
||||||
let collaborationStore: ReturnType<typeof useCollaborationStore>;
|
|
||||||
let orchestrationStore: ReturnType<typeof useOrchestrationStore>;
|
let orchestrationStore: ReturnType<typeof useOrchestrationStore>;
|
||||||
let pushConnection: ReturnType<typeof usePushConnection>;
|
let pushConnection: ReturnType<typeof usePushConnection>;
|
||||||
|
|
||||||
@@ -30,7 +28,6 @@ describe('usePushConnection()', () => {
|
|||||||
|
|
||||||
router = vi.mocked(useRouter)();
|
router = vi.mocked(useRouter)();
|
||||||
pushStore = usePushConnectionStore();
|
pushStore = usePushConnectionStore();
|
||||||
collaborationStore = useCollaborationStore();
|
|
||||||
orchestrationStore = useOrchestrationStore();
|
orchestrationStore = useOrchestrationStore();
|
||||||
pushConnection = usePushConnection({ router });
|
pushConnection = usePushConnection({ router });
|
||||||
});
|
});
|
||||||
@@ -43,12 +40,6 @@ describe('usePushConnection()', () => {
|
|||||||
|
|
||||||
expect(spy).toHaveBeenCalled();
|
expect(spy).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should initialize collaborationStore', () => {
|
|
||||||
const spy = vi.spyOn(collaborationStore, 'initialize').mockImplementation(() => {});
|
|
||||||
pushConnection.initialize();
|
|
||||||
expect(spy).toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('terminate()', () => {
|
describe('terminate()', () => {
|
||||||
@@ -61,12 +52,6 @@ describe('usePushConnection()', () => {
|
|||||||
|
|
||||||
expect(returnFn).toHaveBeenCalled();
|
expect(returnFn).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should terminate collaborationStore', () => {
|
|
||||||
const spy = vi.spyOn(collaborationStore, 'terminate').mockImplementation(() => {});
|
|
||||||
pushConnection.terminate();
|
|
||||||
expect(spy).toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('queuePushMessage()', () => {
|
describe('queuePushMessage()', () => {
|
||||||
|
|||||||
@@ -35,7 +35,6 @@ import { parse } from 'flatted';
|
|||||||
import { ref } from 'vue';
|
import { ref } from 'vue';
|
||||||
import { useOrchestrationStore } from '@/stores/orchestration.store';
|
import { useOrchestrationStore } from '@/stores/orchestration.store';
|
||||||
import { usePushConnectionStore } from '@/stores/pushConnection.store';
|
import { usePushConnectionStore } from '@/stores/pushConnection.store';
|
||||||
import { useCollaborationStore } from '@/stores/collaboration.store';
|
|
||||||
import { useExternalHooks } from '@/composables/useExternalHooks';
|
import { useExternalHooks } from '@/composables/useExternalHooks';
|
||||||
import type { useRouter } from 'vue-router';
|
import type { useRouter } from 'vue-router';
|
||||||
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
|
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
|
||||||
@@ -51,7 +50,6 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
|
|||||||
const i18n = useI18n();
|
const i18n = useI18n();
|
||||||
const telemetry = useTelemetry();
|
const telemetry = useTelemetry();
|
||||||
|
|
||||||
const collaborationStore = useCollaborationStore();
|
|
||||||
const credentialsStore = useCredentialsStore();
|
const credentialsStore = useCredentialsStore();
|
||||||
const nodeTypesStore = useNodeTypesStore();
|
const nodeTypesStore = useNodeTypesStore();
|
||||||
const orchestrationManagerStore = useOrchestrationStore();
|
const orchestrationManagerStore = useOrchestrationStore();
|
||||||
@@ -68,11 +66,9 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
|
|||||||
removeEventListener.value = pushStore.addEventListener((message) => {
|
removeEventListener.value = pushStore.addEventListener((message) => {
|
||||||
void pushMessageReceived(message);
|
void pushMessageReceived(message);
|
||||||
});
|
});
|
||||||
collaborationStore.initialize();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function terminate() {
|
function terminate() {
|
||||||
collaborationStore.terminate();
|
|
||||||
if (typeof removeEventListener.value === 'function') {
|
if (typeof removeEventListener.value === 'function') {
|
||||||
removeEventListener.value();
|
removeEventListener.value();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -637,7 +637,6 @@ export const enum STORES {
|
|||||||
HISTORY = 'history',
|
HISTORY = 'history',
|
||||||
CLOUD_PLAN = 'cloudPlan',
|
CLOUD_PLAN = 'cloudPlan',
|
||||||
RBAC = 'rbac',
|
RBAC = 'rbac',
|
||||||
COLLABORATION = 'collaboration',
|
|
||||||
PUSH = 'push',
|
PUSH = 'push',
|
||||||
BECOME_TEMPLATE_CREATOR = 'becomeTemplateCreator',
|
BECOME_TEMPLATE_CREATOR = 'becomeTemplateCreator',
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,64 +0,0 @@
|
|||||||
import { defineStore } from 'pinia';
|
|
||||||
import { computed, ref } from 'vue';
|
|
||||||
import { useWorkflowsStore } from '@/stores/workflows.store';
|
|
||||||
import { usePushConnectionStore } from '@/stores/pushConnection.store';
|
|
||||||
import { STORES } from '@/constants';
|
|
||||||
import type { IUser } from '@/Interface';
|
|
||||||
|
|
||||||
type ActiveUsersForWorkflows = {
|
|
||||||
[workflowId: string]: Array<{ user: IUser; lastSeen: string }>;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const useCollaborationStore = defineStore(STORES.COLLABORATION, () => {
|
|
||||||
const pushStore = usePushConnectionStore();
|
|
||||||
const workflowStore = useWorkflowsStore();
|
|
||||||
|
|
||||||
const usersForWorkflows = ref<ActiveUsersForWorkflows>({});
|
|
||||||
const pushStoreEventListenerRemovalFn = ref<(() => void) | null>(null);
|
|
||||||
|
|
||||||
const getUsersForCurrentWorkflow = computed(() => {
|
|
||||||
return usersForWorkflows.value[workflowStore.workflowId] ?? [];
|
|
||||||
});
|
|
||||||
|
|
||||||
function initialize() {
|
|
||||||
pushStoreEventListenerRemovalFn.value = pushStore.addEventListener((event) => {
|
|
||||||
if (event.type === 'activeWorkflowUsersChanged') {
|
|
||||||
const activeWorkflowId = workflowStore.workflowId;
|
|
||||||
if (event.data.workflowId === activeWorkflowId) {
|
|
||||||
usersForWorkflows.value[activeWorkflowId] = event.data.activeUsers;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function terminate() {
|
|
||||||
if (typeof pushStoreEventListenerRemovalFn.value === 'function') {
|
|
||||||
pushStoreEventListenerRemovalFn.value();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function workflowUsersUpdated(data: ActiveUsersForWorkflows) {
|
|
||||||
usersForWorkflows.value = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyWorkflowOpened(workflowId: string) {
|
|
||||||
pushStore.send({
|
|
||||||
type: 'workflowOpened',
|
|
||||||
workflowId,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyWorkflowClosed(workflowId: string) {
|
|
||||||
pushStore.send({ type: 'workflowClosed', workflowId });
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
usersForWorkflows,
|
|
||||||
initialize,
|
|
||||||
terminate,
|
|
||||||
notifyWorkflowOpened,
|
|
||||||
notifyWorkflowClosed,
|
|
||||||
workflowUsersUpdated,
|
|
||||||
getUsersForCurrentWorkflow,
|
|
||||||
};
|
|
||||||
});
|
|
||||||
@@ -36,7 +36,6 @@ import { useCredentialsStore } from '@/stores/credentials.store';
|
|||||||
import useEnvironmentsStore from '@/stores/environments.ee.store';
|
import useEnvironmentsStore from '@/stores/environments.ee.store';
|
||||||
import { useExternalSecretsStore } from '@/stores/externalSecrets.ee.store';
|
import { useExternalSecretsStore } from '@/stores/externalSecrets.ee.store';
|
||||||
import { useRootStore } from '@/stores/root.store';
|
import { useRootStore } from '@/stores/root.store';
|
||||||
import { useCollaborationStore } from '@/stores/collaboration.store';
|
|
||||||
import { historyBus } from '@/models/history';
|
import { historyBus } from '@/models/history';
|
||||||
import { useCanvasOperations } from '@/composables/useCanvasOperations';
|
import { useCanvasOperations } from '@/composables/useCanvasOperations';
|
||||||
import { useExecutionsStore } from '@/stores/executions.store';
|
import { useExecutionsStore } from '@/stores/executions.store';
|
||||||
@@ -79,7 +78,6 @@ const credentialsStore = useCredentialsStore();
|
|||||||
const environmentsStore = useEnvironmentsStore();
|
const environmentsStore = useEnvironmentsStore();
|
||||||
const externalSecretsStore = useExternalSecretsStore();
|
const externalSecretsStore = useExternalSecretsStore();
|
||||||
const rootStore = useRootStore();
|
const rootStore = useRootStore();
|
||||||
const collaborationStore = useCollaborationStore();
|
|
||||||
const executionsStore = useExecutionsStore();
|
const executionsStore = useExecutionsStore();
|
||||||
const canvasStore = useCanvasStore();
|
const canvasStore = useCanvasStore();
|
||||||
const npsSurveyStore = useNpsSurveyStore();
|
const npsSurveyStore = useNpsSurveyStore();
|
||||||
@@ -173,7 +171,6 @@ async function initializeData() {
|
|||||||
workflowId: workflowsStore.workflow.id,
|
workflowId: workflowsStore.workflow.id,
|
||||||
workflowName: workflowsStore.workflow.name,
|
workflowName: workflowsStore.workflow.name,
|
||||||
});
|
});
|
||||||
collaborationStore.notifyWorkflowOpened(workflowsStore.workflow.id);
|
|
||||||
|
|
||||||
const selectedExecution = executionsStore.activeExecution;
|
const selectedExecution = executionsStore.activeExecution;
|
||||||
if (selectedExecution?.workflowId !== workflowsStore.workflow.id) {
|
if (selectedExecution?.workflowId !== workflowsStore.workflow.id) {
|
||||||
|
|||||||
@@ -246,7 +246,6 @@ import {
|
|||||||
AI_NODE_CREATOR_VIEW,
|
AI_NODE_CREATOR_VIEW,
|
||||||
DRAG_EVENT_DATA_KEY,
|
DRAG_EVENT_DATA_KEY,
|
||||||
UPDATE_WEBHOOK_ID_NODE_TYPES,
|
UPDATE_WEBHOOK_ID_NODE_TYPES,
|
||||||
TIME,
|
|
||||||
AI_ASSISTANT_LOCAL_STORAGE_KEY,
|
AI_ASSISTANT_LOCAL_STORAGE_KEY,
|
||||||
CANVAS_AUTO_ADD_MANUAL_TRIGGER_EXPERIMENT,
|
CANVAS_AUTO_ADD_MANUAL_TRIGGER_EXPERIMENT,
|
||||||
} from '@/constants';
|
} from '@/constants';
|
||||||
@@ -322,7 +321,6 @@ import type {
|
|||||||
import { type RouteLocation, useRouter } from 'vue-router';
|
import { type RouteLocation, useRouter } from 'vue-router';
|
||||||
import { dataPinningEventBus, nodeViewEventBus } from '@/event-bus';
|
import { dataPinningEventBus, nodeViewEventBus } from '@/event-bus';
|
||||||
import { useCanvasStore } from '@/stores/canvas.store';
|
import { useCanvasStore } from '@/stores/canvas.store';
|
||||||
import { useCollaborationStore } from '@/stores/collaboration.store';
|
|
||||||
import { useCredentialsStore } from '@/stores/credentials.store';
|
import { useCredentialsStore } from '@/stores/credentials.store';
|
||||||
import { useEnvironmentsStore } from '@/stores/environments.ee.store';
|
import { useEnvironmentsStore } from '@/stores/environments.ee.store';
|
||||||
import { useExternalSecretsStore } from '@/stores/externalSecrets.ee.store';
|
import { useExternalSecretsStore } from '@/stores/externalSecrets.ee.store';
|
||||||
@@ -471,18 +469,15 @@ export default defineComponent({
|
|||||||
|
|
||||||
await this.$router.push(to);
|
await this.$router.push(to);
|
||||||
} else {
|
} else {
|
||||||
this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow);
|
|
||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
} else if (confirmModal === MODAL_CANCEL) {
|
} else if (confirmModal === MODAL_CANCEL) {
|
||||||
this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow);
|
|
||||||
this.workflowsStore.setWorkflowId(PLACEHOLDER_EMPTY_WORKFLOW_ID);
|
this.workflowsStore.setWorkflowId(PLACEHOLDER_EMPTY_WORKFLOW_ID);
|
||||||
this.resetWorkspace();
|
this.resetWorkspace();
|
||||||
this.uiStore.stateIsDirty = false;
|
this.uiStore.stateIsDirty = false;
|
||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow);
|
|
||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -588,7 +583,6 @@ export default defineComponent({
|
|||||||
useWorkflowsEEStore,
|
useWorkflowsEEStore,
|
||||||
useHistoryStore,
|
useHistoryStore,
|
||||||
useExternalSecretsStore,
|
useExternalSecretsStore,
|
||||||
useCollaborationStore,
|
|
||||||
usePushConnectionStore,
|
usePushConnectionStore,
|
||||||
useSourceControlStore,
|
useSourceControlStore,
|
||||||
useExecutionsStore,
|
useExecutionsStore,
|
||||||
@@ -1035,7 +1029,6 @@ export default defineComponent({
|
|||||||
if (!this.isDemo) {
|
if (!this.isDemo) {
|
||||||
this.pushStore.pushConnect();
|
this.pushStore.pushConnect();
|
||||||
}
|
}
|
||||||
this.collaborationStore.initialize();
|
|
||||||
},
|
},
|
||||||
beforeUnmount() {
|
beforeUnmount() {
|
||||||
// Make sure the event listeners get removed again else we
|
// Make sure the event listeners get removed again else we
|
||||||
@@ -1049,7 +1042,6 @@ export default defineComponent({
|
|||||||
if (!this.isDemo) {
|
if (!this.isDemo) {
|
||||||
this.pushStore.pushDisconnect();
|
this.pushStore.pushDisconnect();
|
||||||
}
|
}
|
||||||
this.collaborationStore.terminate();
|
|
||||||
|
|
||||||
this.resetWorkspace();
|
this.resetWorkspace();
|
||||||
this.instance.unbind();
|
this.instance.unbind();
|
||||||
@@ -1506,7 +1498,6 @@ export default defineComponent({
|
|||||||
this.executionsStore.activeExecution = selectedExecution;
|
this.executionsStore.activeExecution = selectedExecution;
|
||||||
}
|
}
|
||||||
this.canvasStore.stopLoading();
|
this.canvasStore.stopLoading();
|
||||||
this.collaborationStore.notifyWorkflowOpened(workflow.id);
|
|
||||||
},
|
},
|
||||||
touchTap(e: MouseEvent | TouchEvent) {
|
touchTap(e: MouseEvent | TouchEvent) {
|
||||||
if (this.deviceSupport.isTouchDevice) {
|
if (this.deviceSupport.isTouchDevice) {
|
||||||
@@ -3681,18 +3672,10 @@ export default defineComponent({
|
|||||||
if (this.isDemo || window.preventNodeViewBeforeUnload) {
|
if (this.isDemo || window.preventNodeViewBeforeUnload) {
|
||||||
return;
|
return;
|
||||||
} else if (this.uiStore.stateIsDirty) {
|
} else if (this.uiStore.stateIsDirty) {
|
||||||
// A bit hacky solution to detecting users leaving the page after prompt:
|
|
||||||
// 1. Notify that workflow is closed straight away
|
|
||||||
this.collaborationStore.notifyWorkflowClosed(this.workflowsStore.workflowId);
|
|
||||||
// 2. If user decided to stay on the page we notify that the workflow is opened again
|
|
||||||
this.unloadTimeout = setTimeout(() => {
|
|
||||||
this.collaborationStore.notifyWorkflowOpened(this.workflowsStore.workflowId);
|
|
||||||
}, 5 * TIME.SECOND);
|
|
||||||
e.returnValue = true; //Gecko + IE
|
e.returnValue = true; //Gecko + IE
|
||||||
return true; //Gecko + Webkit, Safari, Chrome etc.
|
return true; //Gecko + Webkit, Safari, Chrome etc.
|
||||||
} else {
|
} else {
|
||||||
this.canvasStore.startLoading(this.$locale.baseText('nodeView.redirecting'));
|
this.canvasStore.startLoading(this.$locale.baseText('nodeView.redirecting'));
|
||||||
this.collaborationStore.notifyWorkflowClosed(this.workflowsStore.workflowId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user