refactor(core): Rename push sessionId to pushRef (#8905)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-04-03 13:43:14 +02:00
committed by GitHub
parent eaaefd76da
commit 072c3db97d
58 changed files with 248 additions and 257 deletions

View File

@@ -1,4 +1,3 @@
import type express from 'express';
import { validate } from 'class-validator';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
@@ -7,13 +6,6 @@ import type { User } from '@db/entities/User';
import type { UserRoleChangePayload, UserUpdatePayload } from '@/requests';
import { BadRequestError } from './errors/response-errors/bad-request.error';
/**
* Returns the session id if one is set
*/
export function getSessionId(req: express.Request): string | undefined {
return req.headers.sessionid as string | undefined;
}
export async function validateEntity(
entity:
| WorkflowEntity

View File

@@ -532,7 +532,7 @@ export interface IWorkflowExecutionDataProcess {
runData?: IRunData;
pinData?: IPinData;
retryOf?: string;
sessionId?: string;
pushRef?: string;
startNodes?: StartNodeData[];
workflowData: IWorkflowBase;
userId: string;

View File

@@ -144,8 +144,8 @@ export class InternalHooks {
]);
}
async onFrontendSettingsAPI(sessionId?: string): Promise<void> {
return await this.telemetry.track('Session started', { session_id: sessionId });
async onFrontendSettingsAPI(pushRef?: string): Promise<void> {
return await this.telemetry.track('Session started', { session_id: pushRef });
}
async onPersonalizationSurveySubmitted(

View File

@@ -338,7 +338,7 @@ export class Server extends AbstractServer {
`/${this.restEndpoint}/settings`,
ResponseHelper.send(
async (req: express.Request): Promise<IN8nUISettings> =>
frontendService.getSettings(req.headers.sessionid as string),
frontendService.getSettings(req.headers['push-ref'] as string),
),
);
}

View File

@@ -91,7 +91,7 @@ export class TestWebhooks implements IWebhookManager {
});
}
const { destinationNode, sessionId, workflowEntity, webhook: testWebhook } = registration;
const { destinationNode, pushRef, workflowEntity, webhook: testWebhook } = registration;
const workflow = this.toWorkflow(workflowEntity);
@@ -112,7 +112,7 @@ export class TestWebhooks implements IWebhookManager {
workflowEntity,
workflowStartNode,
executionMode,
sessionId,
pushRef,
undefined, // IRunExecutionData
undefined, // executionId
request,
@@ -130,11 +130,11 @@ export class TestWebhooks implements IWebhookManager {
if (executionId === undefined) return;
// Inform editor-ui that webhook got received
if (sessionId !== undefined) {
if (pushRef !== undefined) {
this.push.send(
'testWebhookReceived',
{ workflowId: webhook?.workflowId, executionId },
sessionId,
pushRef,
);
}
} catch {}
@@ -147,10 +147,10 @@ export class TestWebhooks implements IWebhookManager {
*/
if (
this.orchestrationService.isMultiMainSetupEnabled &&
sessionId &&
!this.push.getBackend().hasSessionId(sessionId)
pushRef &&
!this.push.getBackend().hasPushRef(pushRef)
) {
const payload = { webhookKey: key, workflowEntity, sessionId };
const payload = { webhookKey: key, workflowEntity, pushRef };
void this.orchestrationService.publish('clear-test-webhooks', payload);
return;
}
@@ -213,7 +213,7 @@ export class TestWebhooks implements IWebhookManager {
workflowEntity: IWorkflowDb,
additionalData: IWorkflowExecuteAdditionalData,
runData?: IRunData,
sessionId?: string,
pushRef?: string,
destinationNode?: string,
) {
if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity);
@@ -260,7 +260,7 @@ export class TestWebhooks implements IWebhookManager {
cacheableWebhook.userId = userId;
const registration: TestWebhookRegistration = {
sessionId,
pushRef,
workflowEntity,
destinationNode,
webhook: cacheableWebhook as IWebhookData,
@@ -302,7 +302,7 @@ export class TestWebhooks implements IWebhookManager {
if (!registration) continue;
const { sessionId, workflowEntity } = registration;
const { pushRef, workflowEntity } = registration;
const workflow = this.toWorkflow(workflowEntity);
@@ -310,9 +310,9 @@ export class TestWebhooks implements IWebhookManager {
this.clearTimeout(key);
if (sessionId !== undefined) {
if (pushRef !== undefined) {
try {
this.push.send('testWebhookDeleted', { workflowId }, sessionId);
this.push.send('testWebhookDeleted', { workflowId }, pushRef);
} catch {
// Could not inform editor, probably is not connected anymore. So simply go on.
}

View File

@@ -223,7 +223,7 @@ export async function executeWebhook(
workflowData: IWorkflowDb,
workflowStartNode: INode,
executionMode: WorkflowExecuteMode,
sessionId: string | undefined,
pushRef: string | undefined,
runExecutionData: IRunExecutionData | undefined,
executionId: string | undefined,
req: WebhookRequest,
@@ -541,7 +541,7 @@ export async function executeWebhook(
const runData: IWorkflowExecutionDataProcess = {
executionMode,
executionData: runExecutionData,
sessionId,
pushRef,
workflowData,
pinData,
userId: user.id,

View File

@@ -244,50 +244,50 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { sessionId, executionId } = this;
const { pushRef, executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
if (sessionId === undefined) {
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
sessionId,
pushRef,
workflowId: this.workflowData.id,
});
pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId);
pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, pushRef);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
const { sessionId, executionId } = this;
const { pushRef, executionId } = this;
// Push data to session which started workflow after each rendered node
if (sessionId === undefined) {
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
sessionId,
pushRef,
workflowId: this.workflowData.id,
});
pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId);
pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, pushRef);
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
const { sessionId, executionId } = this;
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
sessionId,
pushRef,
workflowId,
});
// Push data to session which started the workflow
if (sessionId === undefined) {
if (pushRef === undefined) {
return;
}
pushInstance.send(
@@ -298,24 +298,24 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
startedAt: new Date(),
retryOf: this.retryOf,
workflowId,
sessionId,
pushRef,
workflowName,
},
sessionId,
pushRef,
);
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { sessionId, executionId, retryOf } = this;
const { pushRef, executionId, retryOf } = this;
const { id: workflowId } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
sessionId,
pushRef,
workflowId,
});
// Push data to session which started the workflow
if (sessionId === undefined) {
if (pushRef === undefined) {
return;
}
@@ -351,7 +351,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
retryOf,
};
pushInstance.send('executionFinished', sendData, sessionId);
pushInstance.send('executionFinished', sendData, pushRef);
},
],
};
@@ -378,7 +378,7 @@ export function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
nodeName,
data,
executionData,
this.sessionId,
this.pushRef,
);
},
],
@@ -578,7 +578,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
ErrorReporter.error(e);
logger.error(
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`,
{ sessionId: this.sessionId, workflowId: this.workflowData.id },
{ pushRef: this.pushRef, workflowId: this.workflowData.id },
);
}
}
@@ -939,15 +939,15 @@ export function setExecutionStatus(status: ExecutionStatus) {
}
export function sendDataToUI(type: string, data: IDataObject | IDataObject[]) {
const { sessionId } = this;
if (sessionId === undefined) {
const { pushRef } = this;
if (pushRef === undefined) {
return;
}
// Push data to session which started workflow
try {
const pushInstance = Container.get(Push);
pushInstance.send(type as IPushDataType, data, sessionId);
pushInstance.send(type as IPushDataType, data, pushRef);
} catch (error) {
const logger = Container.get(Logger);
logger.warn(`There was a problem sending message to UI: ${error.message}`);
@@ -1128,7 +1128,7 @@ export function getWorkflowHooksMain(
if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
sessionId: data.sessionId,
pushRef: data.pushRef,
retryOf: data.retryOf as string,
});
}

View File

@@ -295,7 +295,7 @@ export class WorkflowRunner {
});
additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({
sessionId: data.sessionId,
pushRef: data.pushRef,
});
await additionalData.hooks.executeHookFunctions('workflowExecuteBefore', []);

View File

@@ -55,7 +55,7 @@ type PushRequest = Request<
{},
{
type: IPushDataType;
sessionId: string;
pushRef: string;
data: object;
}
>;

View File

@@ -13,7 +13,7 @@ export async function saveExecutionProgress(
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
sessionId?: string,
pushRef?: string,
) {
const saveSettings = toSaveSettings(workflowData.settings);
@@ -97,7 +97,7 @@ export async function saveExecutionProgress(
{
...error,
executionId,
sessionId,
pushRef,
workflowId: workflowData.id,
},
);

View File

@@ -8,7 +8,7 @@ export const corsMiddleware: RequestHandler = (req, res, next) => {
res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, PATCH, DELETE');
res.header(
'Access-Control-Allow-Headers',
'Origin, X-Requested-With, Content-Type, Accept, sessionid',
'Origin, X-Requested-With, Content-Type, Accept, push-ref',
);
}

View File

@@ -14,7 +14,7 @@ import type { OrchestrationService } from '@/services/orchestration.service';
export abstract class AbstractPush<T> extends EventEmitter {
protected connections: Record<string, T> = {};
protected userIdBySessionId: Record<string, string> = {};
protected userIdByPushRef: Record<string, string> = {};
protected abstract close(connection: T): void;
protected abstract sendToOneConnection(connection: T, data: string): void;
@@ -26,100 +26,100 @@ export abstract class AbstractPush<T> extends EventEmitter {
super();
}
protected add(sessionId: string, userId: User['id'], connection: T) {
const { connections, userIdBySessionId: userIdsBySessionId } = this;
this.logger.debug('Add editor-UI session', { sessionId });
protected add(pushRef: string, userId: User['id'], connection: T) {
const { connections, userIdByPushRef } = this;
this.logger.debug('Add editor-UI session', { pushRef });
const existingConnection = connections[sessionId];
const existingConnection = connections[pushRef];
if (existingConnection) {
// Make sure to remove existing connection with the same ID
this.close(existingConnection);
}
connections[sessionId] = connection;
userIdsBySessionId[sessionId] = userId;
connections[pushRef] = connection;
userIdByPushRef[pushRef] = userId;
}
protected onMessageReceived(sessionId: string, msg: unknown) {
this.logger.debug('Received message from editor-UI', { sessionId, msg });
protected onMessageReceived(pushRef: string, msg: unknown) {
this.logger.debug('Received message from editor-UI', { pushRef, msg });
const userId = this.userIdBySessionId[sessionId];
const userId = this.userIdByPushRef[pushRef];
this.emit('message', { sessionId, userId, msg });
this.emit('message', { pushRef, userId, msg });
}
protected remove(sessionId?: string) {
if (!sessionId) return;
protected remove(pushRef?: string) {
if (!pushRef) return;
this.logger.debug('Removed editor-UI session', { sessionId });
this.logger.debug('Removed editor-UI session', { pushRef });
delete this.connections[sessionId];
delete this.userIdBySessionId[sessionId];
delete this.connections[pushRef];
delete this.userIdByPushRef[pushRef];
}
private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) {
private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) {
this.logger.debug(`Send data of type "${type}" to editor-UI`, {
dataType: type,
sessionIds: sessionIds.join(', '),
pushRefs: pushRefs.join(', '),
});
const stringifiedPayload = jsonStringify({ type, data }, { replaceCircularRefs: true });
for (const sessionId of sessionIds) {
const connection = this.connections[sessionId];
for (const pushRef of pushRefs) {
const connection = this.connections[pushRef];
assert(connection);
this.sendToOneConnection(connection, stringifiedPayload);
}
}
sendToAllSessions(type: IPushDataType, data?: unknown) {
this.sendToSessions(type, data, Object.keys(this.connections));
sendToAll(type: IPushDataType, data?: unknown) {
this.sendTo(type, data, Object.keys(this.connections));
}
sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) {
sendToOneSession(type: IPushDataType, data: unknown, pushRef: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, the handler process commands the creator process to
* relay the former's execution lifecyle events to the creator's frontend.
* relay the former's execution lifecycle events to the creator's frontend.
*/
if (this.orchestrationService.isMultiMainSetupEnabled && !this.hasSessionId(sessionId)) {
const payload = { type, args: data, sessionId };
if (this.orchestrationService.isMultiMainSetupEnabled && !this.hasPushRef(pushRef)) {
const payload = { type, args: data, pushRef };
void this.orchestrationService.publish('relay-execution-lifecycle-event', payload);
return;
}
if (this.connections[sessionId] === undefined) {
this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId });
if (this.connections[pushRef] === undefined) {
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
return;
}
this.sendToSessions(type, data, [sessionId]);
this.sendTo(type, data, [pushRef]);
}
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
const { connections } = this;
const userSessionIds = Object.keys(connections).filter((sessionId) =>
userIds.includes(this.userIdBySessionId[sessionId]),
const userPushRefs = Object.keys(connections).filter((pushRef) =>
userIds.includes(this.userIdByPushRef[pushRef]),
);
this.sendToSessions(type, data, userSessionIds);
this.sendTo(type, data, userPushRefs);
}
closeAllConnections() {
for (const sessionId in this.connections) {
for (const pushRef in this.connections) {
// Signal the connection that we want to close it.
// We are not removing the sessions here because it should be
// the implementation's responsibility to do so once the connection
// has actually closed.
this.close(this.connections[sessionId]);
this.close(this.connections[pushRef]);
}
}
hasSessionId(sessionId: string) {
return this.connections[sessionId] !== undefined;
hasPushRef(pushRef: string) {
return this.connections[pushRef] !== undefined;
}
}

View File

@@ -41,36 +41,36 @@ export class Push extends EventEmitter {
const {
user,
ws,
query: { sessionId },
query: { pushRef },
} = req;
if (!sessionId) {
if (!pushRef) {
if (ws) {
ws.send('The query parameter "sessionId" is missing!');
ws.send('The query parameter "pushRef" is missing!');
ws.close(1008);
return;
}
throw new BadRequestError('The query parameter "sessionId" is missing!');
throw new BadRequestError('The query parameter "pushRef" is missing!');
}
if (req.ws) {
(this.backend as WebSocketPush).add(sessionId, user.id, req.ws);
(this.backend as WebSocketPush).add(pushRef, user.id, req.ws);
} else if (!useWebSockets) {
(this.backend as SSEPush).add(sessionId, user.id, { req, res });
(this.backend as SSEPush).add(pushRef, user.id, { req, res });
} else {
res.status(401).send('Unauthorized');
return;
}
this.emit('editorUiConnected', sessionId);
this.emit('editorUiConnected', pushRef);
}
broadcast(type: IPushDataType, data?: unknown) {
this.backend.sendToAllSessions(type, data);
this.backend.sendToAll(type, data);
}
send(type: IPushDataType, data: unknown, sessionId: string) {
this.backend.sendToOneSession(type, data, sessionId);
send(type: IPushDataType, data: unknown, pushRef: string) {
this.backend.sendToOneSession(type, data, pushRef);
}
getBackend() {

View File

@@ -17,13 +17,13 @@ export class SSEPush extends AbstractPush<Connection> {
constructor(logger: Logger, orchestrationService: OrchestrationService) {
super(logger, orchestrationService);
this.channel.on('disconnect', (channel, { req }) => {
this.remove(req?.query?.sessionId);
this.channel.on('disconnect', (_, { req }) => {
this.remove(req?.query?.pushRef);
});
}
add(sessionId: string, userId: User['id'], connection: Connection) {
super.add(sessionId, userId, connection);
add(pushRef: string, userId: User['id'], connection: Connection) {
super.add(pushRef, userId, connection);
this.channel.addClient(connection.req, connection.res);
}

View File

@@ -6,7 +6,7 @@ import type { AuthenticatedRequest } from '@/requests';
// TODO: move all push related types here
export type PushRequest = AuthenticatedRequest<{}, {}, {}, { sessionId: string }>;
export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>;
export type SSEPushRequest = PushRequest & { ws: undefined };
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
@@ -14,7 +14,7 @@ export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
export type PushResponse = Response & { req: PushRequest };
export type OnPushMessageEvent = {
sessionId: string;
pushRef: string;
userId: User['id'];
msg: unknown;
};

View File

@@ -18,21 +18,21 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
setInterval(() => this.pingAll(), 60 * 1000);
}
add(sessionId: string, userId: User['id'], connection: WebSocket) {
add(pushRef: string, userId: User['id'], connection: WebSocket) {
connection.isAlive = true;
connection.on('pong', heartbeat);
super.add(sessionId, userId, connection);
super.add(pushRef, userId, connection);
const onMessage = (data: WebSocket.RawData) => {
try {
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
this.onMessageReceived(sessionId, JSON.parse(buffer.toString('utf8')));
this.onMessageReceived(pushRef, JSON.parse(buffer.toString('utf8')));
} catch (error) {
this.logger.error("Couldn't parse message from editor-UI", {
error: error as unknown,
sessionId,
pushRef,
data,
});
}
@@ -42,7 +42,7 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
connection.once('close', () => {
connection.off('pong', heartbeat);
connection.off('message', onMessage);
this.remove(sessionId);
this.remove(pushRef);
});
connection.on('message', onMessage);
@@ -57,11 +57,11 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
}
private pingAll() {
for (const sessionId in this.connections) {
const connection = this.connections[sessionId];
for (const pushRef in this.connections) {
const connection = this.connections[pushRef];
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
if (!connection.isAlive) {
delete this.connections[sessionId];
delete this.connections[pushRef];
return connection.terminate();
}

View File

@@ -225,8 +225,8 @@ export class FrontendService {
this.writeStaticJSON('credentials', credentials);
}
getSettings(sessionId?: string): IN8nUISettings {
void this.internalHooks.onFrontendSettingsAPI(sessionId);
getSettings(pushRef?: string): IN8nUISettings {
void this.internalHooks.onFrontendSettingsAPI(pushRef);
const restEndpoint = config.getEnv('endpoints.rest');

View File

@@ -196,11 +196,11 @@ export async function handleCommandMessageMain(messageString: string) {
* Do not debounce this - all events share the same message name.
*/
const { type, args, sessionId } = message.payload;
const { type, args, pushRef } = message.payload;
if (!push.getBackend().hasSessionId(sessionId)) break;
if (!push.getBackend().hasPushRef(pushRef)) break;
push.send(type, args, sessionId);
push.send(type, args, pushRef);
break;
}
@@ -212,9 +212,9 @@ export async function handleCommandMessageMain(messageString: string) {
return message;
}
const { webhookKey, workflowEntity, sessionId } = message.payload;
const { webhookKey, workflowEntity, pushRef } = message.payload;
if (!push.getBackend().hasSessionId(sessionId)) break;
if (!push.getBackend().hasPushRef(pushRef)) break;
const testWebhooks = Container.get(TestWebhooks);

View File

@@ -35,12 +35,12 @@ export type RedisServiceBaseCommand =
| {
senderId: string;
command: 'relay-execution-lifecycle-event';
payload: { type: IPushDataType; args: Record<string, unknown>; sessionId: string };
payload: { type: IPushDataType; args: Record<string, unknown>; pushRef: string };
}
| {
senderId: string;
command: 'clear-test-webhooks';
payload: { webhookKey: string; workflowEntity: IWorkflowDb; sessionId: string };
payload: { webhookKey: string; workflowEntity: IWorkflowDb; pushRef: string };
};
export type RedisServiceWorkerResponseObject = {

View File

@@ -5,7 +5,7 @@ import type { IWorkflowDb } from '@/Interfaces';
import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants';
export type TestWebhookRegistration = {
sessionId?: string;
pushRef?: string;
workflowEntity: IWorkflowDb;
destinationNode?: string;
webhook: IWebhookData;

View File

@@ -99,7 +99,7 @@ export class WorkflowExecutionService {
destinationNode,
}: WorkflowRequest.ManualRunPayload,
user: User,
sessionId?: string,
pushRef?: string,
) {
const pinnedTrigger = this.selectPinnedActivatorStarter(
workflowData,
@@ -122,7 +122,7 @@ export class WorkflowExecutionService {
workflowData,
additionalData,
runData,
sessionId,
pushRef,
destinationNode,
);
@@ -138,7 +138,7 @@ export class WorkflowExecutionService {
executionMode: 'manual',
runData,
pinData,
sessionId,
pushRef,
startNodes,
workflowData,
userId: user.id,

View File

@@ -3,7 +3,6 @@ import { v4 as uuid } from 'uuid';
import axios from 'axios';
import * as Db from '@/Db';
import * as GenericHelpers from '@/GenericHelpers';
import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import type { IWorkflowResponse } from '@/Interfaces';
@@ -331,7 +330,7 @@ export class WorkflowsController {
return await this.workflowExecutionService.executeManually(
req.body,
req.user,
GenericHelpers.getSessionId(req),
req.headers['push-ref'] as string,
);
}