feat(editor): Show avatars for users currently working on the same workflow (#7763)

This PR introduces the following changes:
- New Vue stores: `collaborationStore` and `pushConnectionStore`
- Front-end push connection handling overhaul: Keep only a singe
connection open and handle it from the new store
- Add user avatars in the editor header when there are multiple users
working on the same workflow
- Sending a heartbeat event to back-end service periodically to confirm
user is still active

- Back-end overhauls (authored by @tomi):
  - Implementing a cleanup procedure that removes inactive users
  - Refactoring collaboration service current implementation

---------

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Milorad FIlipović
2023-11-23 10:14:34 +01:00
committed by GitHub
parent 99a9ea497a
commit 77bc8ecd4b
18 changed files with 654 additions and 148 deletions

View File

@@ -35,6 +35,8 @@ import { parse } from 'flatted';
import { useSegment } from '@/stores/segment.store';
import { defineComponent } from 'vue';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useCollaborationStore } from '@/stores/collaboration.store';
export const pushConnection = defineComponent({
setup() {
@@ -43,15 +45,16 @@ export const pushConnection = defineComponent({
...useToast(),
};
},
created() {
this.pushStore.addEventListener((message) => {
void this.pushMessageReceived(message);
});
},
mixins: [externalHooks, nodeHelpers, workflowHelpers],
data() {
return {
pushSource: null as WebSocket | EventSource | null,
reconnectTimeout: null as NodeJS.Timeout | null,
retryTimeout: null as NodeJS.Timeout | null,
pushMessageQueue: [] as Array<{ event: Event; retriesLeft: number }>,
connectRetries: 0,
lostConnection: false,
pushMessageQueue: [] as Array<{ message: IPushData; retriesLeft: number }>,
};
},
computed: {
@@ -63,95 +66,22 @@ export const pushConnection = defineComponent({
useSettingsStore,
useSegment,
useOrchestrationStore,
usePushConnectionStore,
useCollaborationStore,
),
sessionId(): string {
return this.rootStore.sessionId;
},
},
methods: {
attemptReconnect() {
this.pushConnect();
},
/**
* Connect to server to receive data via a WebSocket or EventSource
*/
pushConnect(): void {
// always close the previous connection so that we do not end up with multiple connections
this.pushDisconnect();
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
const useWebSockets = this.settingsStore.pushBackend === 'websocket';
const { getRestUrl: restUrl } = this.rootStore;
const url = `/push?sessionId=${this.sessionId}`;
if (useWebSockets) {
const { protocol, host } = window.location;
const baseUrl = restUrl.startsWith('http')
? restUrl.replace(/^http/, 'ws')
: `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`;
this.pushSource = new WebSocket(`${baseUrl}${url}`);
} else {
this.pushSource = new EventSource(`${restUrl}${url}`, { withCredentials: true });
}
this.pushSource.addEventListener('open', this.onConnectionSuccess, false);
this.pushSource.addEventListener('message', this.pushMessageReceived, false);
this.pushSource.addEventListener(
useWebSockets ? 'close' : 'error',
this.onConnectionError,
false,
);
},
onConnectionSuccess() {
this.connectRetries = 0;
this.lostConnection = false;
this.rootStore.pushConnectionActive = true;
try {
// in the workers view context this fn is not defined
this.clearAllStickyNotifications();
} catch {}
this.pushSource?.removeEventListener('open', this.onConnectionSuccess);
},
onConnectionError() {
this.pushDisconnect();
this.connectRetries++;
this.reconnectTimeout = setTimeout(
this.attemptReconnect,
Math.min(this.connectRetries * 2000, 8000), // maximum 8 seconds backoff
);
},
/**
* Close connection to server
*/
pushDisconnect(): void {
if (this.pushSource !== null) {
this.pushSource.removeEventListener('error', this.onConnectionError);
this.pushSource.removeEventListener('close', this.onConnectionError);
this.pushSource.removeEventListener('message', this.pushMessageReceived);
if (this.pushSource.readyState < 2) this.pushSource.close();
this.pushSource = null;
}
this.rootStore.pushConnectionActive = false;
},
/**
* Sometimes the push message is faster as the result from
* the REST API so we do not know yet what execution ID
* is currently active. So internally resend the message
* a few more times
*/
queuePushMessage(event: Event, retryAttempts: number) {
this.pushMessageQueue.push({ event, retriesLeft: retryAttempts });
queuePushMessage(event: IPushData, retryAttempts: number) {
this.pushMessageQueue.push({ message: event, retriesLeft: retryAttempts });
if (this.retryTimeout === null) {
this.retryTimeout = setTimeout(this.processWaitingPushMessages, 20);
@@ -161,7 +91,7 @@ export const pushConnection = defineComponent({
/**
* Process the push messages which are waiting in the queue
*/
processWaitingPushMessages() {
async processWaitingPushMessages() {
if (this.retryTimeout !== null) {
clearTimeout(this.retryTimeout);
this.retryTimeout = null;
@@ -171,7 +101,8 @@ export const pushConnection = defineComponent({
for (let i = 0; i < queueLength; i++) {
const messageData = this.pushMessageQueue.shift();
if (this.pushMessageReceived(messageData!.event, true) === false) {
const result = await this.pushMessageReceived(messageData!.message, true);
if (result === false) {
// Was not successful
messageData!.retriesLeft -= 1;
@@ -191,15 +122,8 @@ export const pushConnection = defineComponent({
/**
* Process a newly received message
*/
async pushMessageReceived(event: Event, isRetry?: boolean): Promise<boolean> {
async pushMessageReceived(receivedData: IPushData, isRetry?: boolean): Promise<boolean> {
const retryAttempts = 5;
let receivedData: IPushData;
try {
// @ts-ignore
receivedData = JSON.parse(event.data);
} catch (error) {
return false;
}
if (receivedData.type === 'sendWorkerStatusMessage') {
const pushData = receivedData.data;
@@ -220,7 +144,7 @@ export const pushConnection = defineComponent({
) {
// If there are already messages in the queue add the new one that all of them
// get executed in order
this.queuePushMessage(event, retryAttempts);
this.queuePushMessage(receivedData, retryAttempts);
return false;
}
@@ -596,7 +520,7 @@ export const pushConnection = defineComponent({
this.workflowsStore.activeExecutionId = pushData.executionId;
}
this.processWaitingPushMessages();
void this.processWaitingPushMessages();
} else if (receivedData.type === 'reloadNodeType') {
await this.nodeTypesStore.getNodeTypes();
await this.nodeTypesStore.getFullNodesProperties([receivedData.data]);