refactor(core): Make all pubsub messages type-safe (#10990)

This commit is contained in:
Iván Ovejero
2024-09-27 12:35:01 +02:00
committed by GitHub
parent 08a27b3148
commit bf7392a878
17 changed files with 285 additions and 306 deletions

View File

@@ -3,13 +3,10 @@ import { mock } from 'jest-mock-extended';
import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/scaling/redis/redis-service-commands';
import type { RedisClientService } from '@/services/redis-client.service';
import { Publisher } from '../pubsub/publisher.service';
import type { PubSub } from '../pubsub/pubsub.types';
describe('Publisher', () => {
let queueModeId: string;
@@ -49,7 +46,7 @@ describe('Publisher', () => {
describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceCommandObject>({ command: 'reloadLicense' });
const msg = mock<PubSub.Command>({ command: 'reload-license' });
await publisher.publishCommand(msg);
@@ -63,8 +60,8 @@ describe('Publisher', () => {
describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceWorkerResponseObject>({
command: 'reloadExternalSecretsProviders',
const msg = mock<PubSub.WorkerResponse>({
command: 'reload-external-secrets-providers',
});
await publisher.publishWorkerResponse(msg);

View File

@@ -3,12 +3,10 @@ import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/scaling/redis/redis-service-commands';
import { RedisClientService } from '@/services/redis-client.service';
import type { PubSub } from './pubsub.types';
/**
* Responsible for publishing messages into the pubsub channels used by scaling mode.
*/
@@ -44,7 +42,7 @@ export class Publisher {
// #region Publishing
/** Publish a command into the `n8n.commands` channel. */
async publishCommand(msg: Omit<RedisServiceCommandObject, 'senderId'>) {
async publishCommand(msg: Omit<PubSub.Command, 'senderId'>) {
await this.client.publish(
'n8n.commands',
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
@@ -54,7 +52,7 @@ export class Publisher {
}
/** Publish a response for a command into the `n8n.worker-response` channel. */
async publishWorkerResponse(msg: RedisServiceWorkerResponseObject) {
async publishWorkerResponse(msg: PubSub.WorkerResponse) {
await this.client.publish('n8n.worker-response', JSON.stringify(msg));
this.logger.debug(`Published response for ${msg.command} to worker response channel`);

View File

@@ -1,96 +1,198 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
import type { Resolve } from '@/utlity.types';
import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants';
/** Pubsub channel used by scaling mode. */
export type PubSubChannel = typeof COMMAND_PUBSUB_CHANNEL | typeof WORKER_RESPONSE_PUBSUB_CHANNEL;
export namespace PubSub {
// ----------------------------------
// channels
// ----------------------------------
/** Handler function for every message received via a `PubSubChannel`. */
export type PubSubHandlerFn = (msg: string) => void;
/** Pubsub channel used by scaling mode. */
export type Channel = typeof COMMAND_PUBSUB_CHANNEL | typeof WORKER_RESPONSE_PUBSUB_CHANNEL;
export type PubSubMessageMap = {
// #region Lifecycle
/** Handler function for every message received via a pubsub channel. */
export type HandlerFn = (msg: string) => void;
'reload-license': never;
// ----------------------------------
// commands
// ----------------------------------
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
export type CommandMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': never;
'reload-external-secrets-providers': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': never;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
type _ToCommand<CommandKey extends keyof CommandMap> = {
senderId: string;
targets?: string[];
command: CommandKey;
} & (CommandMap[CommandKey] extends never
? { payload?: never } // some commands carry no payload
: { payload: CommandMap[CommandKey] });
type ToCommand<CommandKey extends keyof CommandMap> = Resolve<_ToCommand<CommandKey>>;
namespace Command {
export type ReloadLicense = ToCommand<'reload-license'>;
export type RestartEventBus = ToCommand<'restart-event-bus'>;
export type ReloadExternalSecretsProviders = ToCommand<'reload-external-secrets-providers'>;
export type CommunityPackageInstall = ToCommand<'community-package-install'>;
export type CommunityPackageUpdate = ToCommand<'community-package-update'>;
export type CommunityPackageUninstall = ToCommand<'community-package-uninstall'>;
export type GetWorkerId = ToCommand<'get-worker-id'>;
export type GetWorkerStatus = ToCommand<'get-worker-status'>;
export type AddWebhooksTriggersAndPollers = ToCommand<'add-webhooks-triggers-and-pollers'>;
export type RemoveTriggersAndPollers = ToCommand<'remove-triggers-and-pollers'>;
export type DisplayWorkflowActivation = ToCommand<'display-workflow-activation'>;
export type DisplayWorkflowDeactivation = ToCommand<'display-workflow-deactivation'>;
export type DisplayWorkflowActivationError = ToCommand<'display-workflow-activation-error'>;
export type RelayExecutionLifecycleEvent = ToCommand<'relay-execution-lifecycle-event'>;
export type ClearTestWebhooks = ToCommand<'clear-test-webhooks'>;
}
/** Command sent via the `n8n.commands` pubsub channel. */
export type Command =
| Command.ReloadLicense
| Command.RestartEventBus
| Command.ReloadExternalSecretsProviders
| Command.CommunityPackageInstall
| Command.CommunityPackageUpdate
| Command.CommunityPackageUninstall
| Command.GetWorkerId
| Command.GetWorkerStatus
| Command.AddWebhooksTriggersAndPollers
| Command.RemoveTriggersAndPollers
| Command.DisplayWorkflowActivation
| Command.DisplayWorkflowDeactivation
| Command.DisplayWorkflowActivationError
| Command.RelayExecutionLifecycleEvent
| Command.ClearTestWebhooks;
// ----------------------------------
// worker responses
// ----------------------------------
export type WorkerResponseMap = {
// #region Lifecycle
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
};
'stop-worker': never;
type _ToWorkerResponse<WorkerResponseKey extends keyof WorkerResponseMap> = {
workerId: string;
targets?: string[];
command: WorkerResponseKey;
} & (WorkerResponseMap[WorkerResponseKey] extends never
? { payload?: never } // some responses carry no payload
: { payload: WorkerResponseMap[WorkerResponseKey] });
// #endregion
type ToWorkerResponse<WorkerResponseKey extends keyof WorkerResponseMap> = Resolve<
_ToWorkerResponse<WorkerResponseKey>
>;
// #region Community packages
namespace WorkerResponse {
export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>;
export type ReloadExternalSecretsProviders =
ToWorkerResponse<'reload-external-secrets-providers'>;
export type GetWorkerId = ToWorkerResponse<'get-worker-id'>;
export type GetWorkerStatus = ToWorkerResponse<'get-worker-status'>;
}
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
packageVersion: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
// currently 'workflow-failed-to-activate'
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};
/** Response sent via the `n8n.worker-response` pubsub channel. */
export type WorkerResponse =
| WorkerResponse.RestartEventBus
| WorkerResponse.ReloadExternalSecretsProviders
| WorkerResponse.GetWorkerId
| WorkerResponse.GetWorkerStatus;
}

View File

@@ -5,7 +5,7 @@ import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis-client.service';
import type { PubSubHandlerFn, PubSubChannel } from './pubsub.types';
import type { PubSub } from './pubsub.types';
/**
* Responsible for subscribing to the pubsub channels used by scaling mode.
@@ -14,7 +14,7 @@ import type { PubSubHandlerFn, PubSubChannel } from './pubsub.types';
export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient;
private readonly handlers: Map<PubSubChannel, PubSubHandlerFn> = new Map();
private readonly handlers: Map<PubSub.Channel, PubSub.HandlerFn> = new Map();
// #region Lifecycle
@@ -29,7 +29,7 @@ export class Subscriber {
this.client.on('error', (error) => this.logger.error(error.message));
this.client.on('message', (channel: PubSubChannel, message) => {
this.client.on('message', (channel: PubSub.Channel, message) => {
this.handlers.get(channel)?.(message);
});
}
@@ -47,7 +47,7 @@ export class Subscriber {
// #region Subscribing
async subscribe(channel: PubSubChannel) {
async subscribe(channel: PubSub.Channel) {
await this.client.subscribe(channel, (error) => {
if (error) {
this.logger.error('Failed to subscribe to channel', { channel, cause: error });
@@ -59,7 +59,7 @@ export class Subscriber {
}
/** Set the message handler function for a channel. */
setMessageHandler(channel: PubSubChannel, handlerFn: PubSubHandlerFn) {
setMessageHandler(channel: PubSub.Channel, handlerFn: PubSub.HandlerFn) {
this.handlers.set(channel, handlerFn);
}

View File

@@ -1,103 +0,0 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
export type RedisServiceCommand =
| 'getStatus'
| 'getId'
| 'restartEventBus'
| 'stopWorker'
| 'reloadLicense'
| 'reloadExternalSecretsProviders'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
| 'display-workflow-activation' // multi-main only
| 'display-workflow-deactivation' // multi-main only
| 'add-webhooks-triggers-and-pollers' // multi-main only
| 'remove-triggers-and-pollers' // multi-main only
| 'workflow-failed-to-activate' // multi-main only
| 'relay-execution-lifecycle-event' // multi-main only
| 'clear-test-webhooks'; // multi-main only
/**
* An object to be sent via Redis pubsub from the main process to the workers.
* @field command: The command to be executed.
* @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids.
* @field payload: Optional arguments to be sent with the command.
*/
export type RedisServiceBaseCommand =
| {
senderId: string;
command: Exclude<
RedisServiceCommand,
| 'relay-execution-lifecycle-event'
| 'clear-test-webhooks'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
>;
payload?: {
[key: string]: string | number | boolean | string[] | number[] | boolean[];
};
}
| {
senderId: string;
command: 'relay-execution-lifecycle-event';
payload: { type: PushType; args: Record<string, unknown>; pushRef: string };
}
| {
senderId: string;
command: 'clear-test-webhooks';
payload: { webhookKey: string; workflowEntity: IWorkflowDb; pushRef: string };
}
| {
senderId: string;
command:
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall';
payload: { packageName: string; packageVersion: string };
};
export type RedisServiceWorkerResponseObject = {
workerId: string;
} & (
| RedisServiceBaseCommand
| {
command: 'getStatus';
payload: WorkerStatus;
}
| {
command: 'getId';
}
| {
command: 'restartEventBus';
payload: {
result: 'success' | 'error';
error?: string;
};
}
| {
command: 'reloadExternalSecretsProviders';
payload: {
result: 'success' | 'error';
error?: string;
};
}
| {
command: 'stopWorker';
}
| {
command: 'workflowActiveStateChanged';
payload: {
oldState: boolean;
newState: boolean;
workflowId: string;
};
}
) & { targets?: string[] };
export type RedisServiceCommandObject = {
targets?: string[];
} & RedisServiceBaseCommand;

View File

@@ -3,8 +3,8 @@ export type RedisClientType = N8nRedisClientType | BullRedisClientType;
/**
* Redis client used by n8n.
*
* - `subscriber(n8n)` to listen for messages from scaling mode communication channels
* - `publisher(n8n)` to send messages into scaling mode communication channels
* - `subscriber(n8n)` to listen for messages from scaling mode pubsub channels
* - `publisher(n8n)` to send messages into scaling mode pubsub channels
* - `cache(n8n)` for caching operations (variables, resource ownership, etc.)
*/
type N8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)';