Files
n8n-enterprise-unlocked/packages/cli/src/collaboration/collaboration.service.ts

105 lines
3.3 KiB
TypeScript

import type { PushPayload } from '@n8n/api-types';
import type { User } from '@n8n/db';
import { UserRepository } from '@n8n/db';
import { Service } from '@n8n/di';
import { ErrorReporter } from 'n8n-core';
import type { Workflow } from 'n8n-workflow';
import { UnexpectedError } from 'n8n-workflow';
import { CollaborationState } from '@/collaboration/collaboration.state';
import { Push } from '@/push';
import type { OnPushMessage } from '@/push/types';
import { AccessService } from '@/services/access.service';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
import { parseWorkflowMessage } from './collaboration.message';
/**
* Service for managing collaboration feature between users. E.g. keeping
* track of active users for a workflow.
*/
@Service()
export class CollaborationService {
constructor(
private readonly errorReporter: ErrorReporter,
private readonly push: Push,
private readonly state: CollaborationState,
private readonly userRepository: UserRepository,
private readonly accessService: AccessService,
) {}
init() {
this.push.on('message', async (event: OnPushMessage) => {
try {
await this.handleUserMessage(event.userId, event.msg);
} catch (error) {
this.errorReporter.error(
new UnexpectedError('Error handling CollaborationService push message', {
extra: {
msg: event.msg,
userId: event.userId,
},
cause: error,
}),
);
}
});
}
async handleUserMessage(userId: User['id'], msg: unknown) {
const workflowMessage = await parseWorkflowMessage(msg);
if (workflowMessage.type === 'workflowOpened') {
await this.handleWorkflowOpened(userId, workflowMessage);
} else if (workflowMessage.type === 'workflowClosed') {
await this.handleWorkflowClosed(userId, workflowMessage);
}
}
private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) {
const { workflowId } = msg;
if (!(await this.accessService.hasReadAccess(userId, workflowId))) {
return;
}
await this.state.addCollaborator(workflowId, userId);
await this.sendWorkflowUsersChangedMessage(workflowId);
}
private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) {
const { workflowId } = msg;
if (!(await this.accessService.hasReadAccess(userId, workflowId))) {
return;
}
await this.state.removeCollaborator(workflowId, userId);
await this.sendWorkflowUsersChangedMessage(workflowId);
}
private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
// We have already validated that all active workflow users
// have proper access to the workflow, so we don't need to validate it again
const collaborators = await this.state.getCollaborators(workflowId);
const userIds = collaborators.map((user) => user.userId);
if (userIds.length === 0) {
return;
}
const users = await this.userRepository.getByIds(this.userRepository.manager, userIds);
const activeCollaborators = users.map((user) => ({
user: user.toIUser(),
lastSeen: collaborators.find(({ userId }) => userId === user.id)!.lastSeen,
}));
const msgData: PushPayload<'collaboratorsChanged'> = {
workflowId,
collaborators: activeCollaborators,
};
this.push.sendToUsers({ type: 'collaboratorsChanged', data: msgData }, userIds);
}
}