perf: Lazy-load queue-mode and analytics dependencies (#5061)

* refactor: lazy load ioredis and bull

* upgrade bull and hiredis

* refactor: lazy load posthog, rudderstack, and sentry

* upgrade Sentry sdk
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-01-02 12:14:39 +01:00
committed by GitHub
parent 7e3f3c5097
commit b828cb31d6
17 changed files with 240 additions and 131 deletions

View File

@@ -65,7 +65,6 @@
"@oclif/dev-cli": "^1.22.2",
"@types/basic-auth": "^1.1.2",
"@types/bcryptjs": "^2.4.2",
"@types/bull": "^3.3.10",
"@types/compression": "1.0.1",
"@types/connect-history-api-fallback": "^1.3.1",
"@types/convict": "^4.2.1",
@@ -108,14 +107,14 @@
"@oclif/core": "^1.16.4",
"@oclif/errors": "^1.3.6",
"@rudderstack/rudder-sdk-node": "1.0.6",
"@sentry/integrations": "^7.17.3",
"@sentry/node": "^7.17.3",
"@sentry/integrations": "^7.28.1",
"@sentry/node": "^7.28.1",
"axios": "^0.21.1",
"basic-auth": "^2.0.1",
"bcryptjs": "^2.4.3",
"body-parser": "^1.18.3",
"body-parser-xml": "^2.0.3",
"bull": "^3.19.0",
"bull": "^4.10.2",
"callsites": "^3.1.0",
"change-case": "^4.1.1",
"class-validator": "^0.13.1",
@@ -135,7 +134,7 @@
"google-timezones-json": "^1.0.2",
"handlebars": "4.7.7",
"inquirer": "^7.0.1",
"ioredis": "^4.28.5",
"ioredis": "^5.2.4",
"json-diff": "^0.5.4",
"jsonschema": "^1.4.1",
"jsonwebtoken": "^8.5.1",

View File

@@ -1,12 +1,10 @@
import * as Sentry from '@sentry/node';
import { RewriteFrames } from '@sentry/integrations';
import type { Application } from 'express';
import config from '@/config';
import { ErrorReporterProxy } from 'n8n-workflow';
let initialized = false;
export const initErrorHandling = () => {
export const initErrorHandling = async () => {
if (initialized) return;
if (!config.getEnv('diagnostics.enabled')) {
@@ -20,7 +18,11 @@ export const initErrorHandling = () => {
const dsn = config.getEnv('diagnostics.config.sentry.dsn');
const { N8N_VERSION: release, ENVIRONMENT: environment } = process.env;
Sentry.init({
const { init, captureException } = await import('@sentry/node');
// eslint-disable-next-line @typescript-eslint/naming-convention
const { RewriteFrames } = await import('@sentry/integrations');
init({
dsn,
release,
environment,
@@ -37,14 +39,16 @@ export const initErrorHandling = () => {
});
ErrorReporterProxy.init({
report: (error, options) => Sentry.captureException(error, options),
report: (error, options) => captureException(error, options),
});
initialized = true;
};
export const setupErrorMiddleware = (app: Application) => {
const { requestHandler, errorHandler } = Sentry.Handlers;
export const setupErrorMiddleware = async (app: Application) => {
const {
Handlers: { requestHandler, errorHandler },
} = await import('@sentry/node');
app.use(requestHandler());
app.use(errorHandler());
};

View File

@@ -13,10 +13,16 @@ export class InternalHooksManager {
throw new Error('InternalHooks not initialized');
}
static init(instanceId: string, versionCli: string, nodeTypes: INodeTypes): InternalHooksClass {
static async init(
instanceId: string,
versionCli: string,
nodeTypes: INodeTypes,
): Promise<InternalHooksClass> {
if (!this.internalHooksInstance) {
const telemetry = new Telemetry(instanceId, versionCli);
await telemetry.init();
this.internalHooksInstance = new InternalHooksClass(
new Telemetry(instanceId, versionCli),
telemetry,
instanceId,
versionCli,
nodeTypes,

View File

@@ -1,4 +1,5 @@
import Bull from 'bull';
import type Bull from 'bull';
import type { RedisOptions } from 'ioredis';
import { IExecuteResponsePromiseData } from 'n8n-workflow';
import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions';
@@ -22,15 +23,17 @@ export interface WebhookResponse {
}
export class Queue {
private activeExecutions: ActiveExecutions.ActiveExecutions;
private jobQueue: JobQueue;
constructor() {
this.activeExecutions = ActiveExecutions.getInstance();
constructor(private activeExecutions: ActiveExecutions.ActiveExecutions) {}
async init() {
const prefix = config.getEnv('queue.bull.prefix');
const redisOptions = config.getEnv('queue.bull.redis');
const redisOptions: RedisOptions = config.getEnv('queue.bull.redis');
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Bull } = await import('bull');
// Disabling ready check is necessary as it allows worker to
// quickly reconnect to Redis if Redis crashes or is unreachable
// for some time. With it enabled, worker might take minutes to realize
@@ -89,9 +92,10 @@ export class Queue {
let activeQueueInstance: Queue | undefined;
export function getInstance(): Queue {
export async function getInstance(): Promise<Queue> {
if (activeQueueInstance === undefined) {
activeQueueInstance = new Queue();
activeQueueInstance = new Queue(ActiveExecutions.getInstance());
await activeQueueInstance.init();
}
return activeQueueInstance;

View File

@@ -270,7 +270,7 @@ class App {
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
setupErrorMiddleware(this.app);
void setupErrorMiddleware(this.app);
if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
@@ -1299,7 +1299,8 @@ class App {
ResponseHelper.send(
async (req: ExecutionRequest.GetAllCurrent): Promise<IExecutionsSummary[]> => {
if (config.getEnv('executions.mode') === 'queue') {
const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']);
const queue = await Queue.getInstance();
const currentJobs = await queue.getJobs(['active', 'waiting']);
const currentlyRunningQueueIds = currentJobs.map((job) => job.data.executionId);
@@ -1428,14 +1429,15 @@ class App {
} as IExecutionsStopData;
}
const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']);
const queue = await Queue.getInstance();
const currentJobs = await queue.getJobs(['active', 'waiting']);
const job = currentJobs.find((job) => job.data.executionId.toString() === req.params.id);
if (!job) {
throw new Error(`Could not stop "${req.params.id}" as it is no longer in queue.`);
} else {
await Queue.getInstance().stopJob(job);
await queue.stopJob(job);
}
const executionDb = (await Db.collections.Execution.findOne(

View File

@@ -213,7 +213,7 @@ class App {
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
setupErrorMiddleware(this.app);
void setupErrorMiddleware(this.app);
}
/**

View File

@@ -62,19 +62,10 @@ export class WorkflowRunner {
constructor() {
this.push = Push.getInstance();
this.activeExecutions = ActiveExecutions.getInstance();
const executionsMode = config.getEnv('executions.mode');
if (executionsMode === 'queue') {
this.jobQueue = Queue.getInstance().getBullObjectInstance();
}
initErrorHandling();
}
/**
* The process did send a hook message so execute the appropriate hook
*
*/
processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -83,7 +74,6 @@ export class WorkflowRunner {
/**
* The process did error
*
*/
async processError(
error: ExecutionError,
@@ -133,13 +123,20 @@ export class WorkflowRunner {
executionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
const executionsProcess = config.getEnv('executions.process');
const executionsMode = config.getEnv('executions.mode');
const executionsProcess = config.getEnv('executions.process');
await initErrorHandling();
if (executionsMode === 'queue') {
const queue = await Queue.getInstance();
this.jobQueue = queue.getBullObjectInstance();
}
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
executionId = await this.runBull(
executionId = await this.enqueueExecution(
data,
loadStaticData,
realtime,
@@ -378,7 +375,7 @@ export class WorkflowRunner {
return executionId;
}
async runBull(
async enqueueExecution(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
@@ -444,7 +441,8 @@ export class WorkflowRunner {
async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
await Queue.getInstance().stopJob(job);
const queue = await Queue.getInstance();
await queue.stopJob(job);
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.

View File

@@ -76,14 +76,12 @@ class WorkflowRunnerProcess {
}, 30000);
}
constructor() {
initErrorHandling();
}
async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> {
process.once('SIGTERM', WorkflowRunnerProcess.stopProcess);
process.once('SIGINT', WorkflowRunnerProcess.stopProcess);
await initErrorHandling();
// eslint-disable-next-line no-multi-assign
const logger = (this.logger = getLogger());
LoggerProxy.init(logger);
@@ -114,7 +112,7 @@ class WorkflowRunnerProcess {
const instanceId = (await UserSettings.prepareUserSettings()).instanceId ?? '';
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, cli, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);

View File

@@ -139,7 +139,7 @@ export class Execute extends Command {
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, cli, nodeTypes);
if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) {
workflowId = undefined;

View File

@@ -327,7 +327,7 @@ export class ExecuteBatch extends Command {
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, cli, nodeTypes);
// Send a shallow copy of allWorkflows so we still have all workflow data.
const results = await this.runTests([...allWorkflows]);

View File

@@ -12,7 +12,6 @@ import { createReadStream, createWriteStream, existsSync } from 'fs';
import localtunnel from 'localtunnel';
import { BinaryDataManager, TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core';
import { Command, flags } from '@oclif/command';
import Redis from 'ioredis';
import stream from 'stream';
import replaceStream from 'replacestream';
import { promisify } from 'util';
@@ -225,7 +224,7 @@ export class Start extends Command {
LoggerProxy.init(logger);
logger.info('Initializing n8n process');
initErrorHandling();
await initErrorHandling();
await CrashJournal.init();
// eslint-disable-next-line @typescript-eslint/no-shadow
@@ -394,6 +393,9 @@ export class Start extends Command {
settings.db = redisDB;
}
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using the retryStrategy above
@@ -466,7 +468,7 @@ export class Start extends Command {
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, cli, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);

View File

@@ -6,7 +6,6 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { BinaryDataManager, UserSettings } from 'n8n-core';
import { Command, flags } from '@oclif/command';
import Redis from 'ioredis';
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import config from '@/config';
@@ -93,7 +92,7 @@ export class Webhook extends Command {
process.once('SIGTERM', Webhook.stopProcess);
process.once('SIGINT', Webhook.stopProcess);
initErrorHandling();
await initErrorHandling();
await CrashJournal.init();
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow
@@ -153,7 +152,7 @@ export class Webhook extends Command {
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, cli, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
@@ -203,6 +202,9 @@ export class Webhook extends Command {
settings.db = redisDB;
}
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using the retryStrategy above

View File

@@ -299,14 +299,15 @@ export class Worker extends Command {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
Worker.jobQueue = Queue.getInstance().getBullObjectInstance();
const queue = await Queue.getInstance();
Worker.jobQueue = queue.getBullObjectInstance();
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, nodeTypes));
const versions = await GenericHelpers.getVersions();
const instanceId = await UserSettings.getInstanceId();
InternalHooksManager.init(instanceId, versions.cli, nodeTypes);
await InternalHooksManager.init(instanceId, versions.cli, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);

View File

@@ -180,7 +180,8 @@ export class ExecutionsService {
const executingWorkflowIds: string[] = [];
if (config.getEnv('executions.mode') === 'queue') {
const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']);
const queue = await Queue.getInstance();
const currentJobs = await queue.getJobs(['active', 'waiting']);
executingWorkflowIds.push(...currentJobs.map(({ data }) => data.executionId));
}

View File

@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import RudderStack from '@rudderstack/rudder-sdk-node';
import { PostHog } from 'posthog-node';
import type RudderStack from '@rudderstack/rudder-sdk-node';
import type { PostHog } from 'posthog-node';
import { ITelemetryTrackProperties, LoggerProxy } from 'n8n-workflow';
import config from '@/config';
import { IExecutionTrackProperties } from '@/Interfaces';
@@ -31,20 +31,14 @@ export class Telemetry {
private postHog?: PostHog;
private instanceId: string;
private versionCli: string;
private pulseIntervalReference: NodeJS.Timeout;
private executionCountsBuffer: IExecutionsBuffer = {};
constructor(instanceId: string, versionCli: string) {
this.instanceId = instanceId;
this.versionCli = versionCli;
constructor(private instanceId: string, private versionCli: string) {}
async init() {
const enabled = config.getEnv('diagnostics.enabled');
const logLevel = config.getEnv('logs.level');
if (enabled) {
const conf = config.getEnv('diagnostics.config.backend');
const [key, url] = conf.split(';');
@@ -56,21 +50,20 @@ export class Telemetry {
return;
}
this.rudderStack = this.initRudderStack(key, url, logLevel);
this.postHog = this.initPostHog();
const logLevel = config.getEnv('logs.level');
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: RudderStack } = await import('@rudderstack/rudder-sdk-node');
this.rudderStack = new RudderStack(key, url, { logLevel });
// eslint-disable-next-line @typescript-eslint/naming-convention
const { PostHog } = await import('posthog-node');
this.postHog = new PostHog(config.getEnv('diagnostics.config.posthog.apiKey'));
this.startPulse();
}
}
private initRudderStack(key: string, url: string, logLevel: string): RudderStack {
return new RudderStack(key, url, { logLevel });
}
private initPostHog(): PostHog {
return new PostHog(config.getEnv('diagnostics.config.posthog.apiKey'));
}
private startPulse() {
this.pulseIntervalReference = setInterval(async () => {
void this.pulse();

View File

@@ -10,16 +10,6 @@ jest.mock('@/license/License.service', () => {
};
});
jest.mock('posthog-node');
jest.spyOn(Telemetry.prototype as any, 'initRudderStack').mockImplementation(() => {
return {
flush: () => {},
identify: () => {},
track: () => {},
};
});
describe('Telemetry', () => {
let startPulseSpy: jest.SpyInstance;
const spyTrack = jest.spyOn(Telemetry.prototype, 'track').mockName('track');
@@ -49,6 +39,11 @@ describe('Telemetry', () => {
beforeEach(() => {
spyTrack.mockClear();
telemetry = new Telemetry(instanceId, n8nVersion);
(telemetry as any).rudderStack = {
flush: () => {},
identify: () => {},
track: () => {},
};
});
afterEach(() => {