feat(core): Introduce scoped logging (#11127)

This commit is contained in:
Iván Ovejero
2024-10-09 12:56:31 +02:00
committed by GitHub
parent 518e320404
commit c68782c633
23 changed files with 221 additions and 122 deletions

View File

@@ -1,6 +1,17 @@
import { Config, Env, Nested } from '../decorators';
import { StringArray } from '../utils';
/**
* Scopes (areas of functionality) to filter logs by.
*
* `executions` -> execution lifecycle
* `license` -> license SDK
* `scaling` -> scaling mode
*/
export const LOG_SCOPES = ['executions', 'license', 'scaling'] as const;
export type LogScope = (typeof LOG_SCOPES)[number];
@Config
class FileLoggingConfig {
/**
@@ -44,4 +55,19 @@ export class LoggingConfig {
@Nested
file: FileLoggingConfig;
/**
* Scopes to filter logs by. Nothing is filtered by default.
*
* Currently supported log scopes:
* - `executions`
* - `license`
* - `scaling`
*
* @example
* `N8N_LOG_SCOPES=license`
* `N8N_LOG_SCOPES=license,executions`
*/
@Env('N8N_LOG_SCOPES')
scopes: StringArray<LogScope> = [];
}

View File

@@ -18,6 +18,9 @@ import { VersionNotificationsConfig } from './configs/version-notifications.conf
import { WorkflowsConfig } from './configs/workflows.config';
import { Config, Env, Nested } from './decorators';
export { LOG_SCOPES } from './configs/logging.config';
export type { LogScope } from './configs/logging.config';
@Config
export class GlobalConfig {
@Nested

View File

@@ -241,13 +241,13 @@ describe('GlobalConfig', () => {
fileSizeMax: 16,
location: 'logs/n8n.log',
},
scopes: [],
},
};
it('should use all default values when no env variables are defined', () => {
process.env = {};
const config = Container.get(GlobalConfig);
expect(deepCopy(config)).toEqual(defaultConfig);
expect(mockFs.readFileSync).not.toHaveBeenCalled();
});

View File

@@ -5,7 +5,7 @@ import type { InstanceSettings } from 'n8n-core';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { License } from '@/license';
import type { Logger } from '@/logging/logger.service';
import { mockLogger } from '@test/mocking';
jest.mock('@n8n_io/license-sdk');
@@ -25,37 +25,39 @@ describe('License', () => {
});
let license: License;
const logger = mock<Logger>();
const instanceSettings = mock<InstanceSettings>({
instanceId: MOCK_INSTANCE_ID,
instanceType: 'main',
});
beforeEach(async () => {
license = new License(logger, instanceSettings, mock(), mock(), mock());
license = new License(mockLogger(), instanceSettings, mock(), mock(), mock());
await license.init();
});
test('initializes license manager', async () => {
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
renewOnInit: true,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
renewOnInit: true,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
}),
);
});
test('initializes license manager for worker', async () => {
const logger = mockLogger();
license = new License(
logger,
mock<InstanceSettings>({ instanceType: 'worker' }),
@@ -64,22 +66,23 @@ describe('License', () => {
mock(),
);
await license.init();
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
renewOnInit: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
renewOnInit: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
}),
);
});
test('attempts to activate license with provided key', async () => {
@@ -196,7 +199,7 @@ describe('License', () => {
it('should enable renewal', async () => {
config.set('multiMainSetup.enabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@@ -208,7 +211,7 @@ describe('License', () => {
it('should disable renewal', async () => {
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@@ -226,7 +229,7 @@ describe('License', () => {
config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@@ -241,7 +244,7 @@ describe('License', () => {
config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@@ -252,7 +255,7 @@ describe('License', () => {
config.set('multiMainSetup.enabled', true);
config.set('multiMainSetup.instanceType', 'leader');
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@@ -264,7 +267,7 @@ describe('License', () => {
describe('reinit', () => {
it('should reinitialize license manager', async () => {
const license = new License(mock(), mock(), mock(), mock(), mock());
const license = new License(mockLogger(), mock(), mock(), mock(), mock());
await license.init();
const initSpy = jest.spyOn(license, 'init');

View File

@@ -5,6 +5,7 @@ import type { IExecutionResponse } from '@/interfaces';
import type { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee';
import { OrchestrationService } from '@/services/orchestration.service';
import { WaitTracker } from '@/wait-tracker';
import { mockLogger } from '@test/mocking';
jest.useFakeTimers();
@@ -21,7 +22,7 @@ describe('WaitTracker', () => {
let waitTracker: WaitTracker;
beforeEach(() => {
waitTracker = new WaitTracker(
mock(),
mockLogger(),
executionRepository,
mock(),
mock(),

View File

@@ -2,7 +2,12 @@ import 'reflect-metadata';
import { GlobalConfig } from '@n8n/config';
import { Command, Errors } from '@oclif/core';
import { BinaryDataService, InstanceSettings, ObjectStoreService } from 'n8n-core';
import { ApplicationError, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import {
ApplicationError,
ensureError,
ErrorReporterProxy as ErrorReporter,
sleep,
} from 'n8n-workflow';
import { Container } from 'typedi';
import type { AbstractServer } from '@/abstract-server';
@@ -283,8 +288,9 @@ export abstract class BaseCommand extends Command {
this.logger.debug('Attempting license activation');
await this.license.activate(activationKey);
this.logger.debug('License init complete');
} catch (e) {
this.logger.error('Could not activate license', e as Error);
} catch (e: unknown) {
const error = ensureError(e);
this.logger.error('Could not activate license', { error });
}
}
}

View File

@@ -11,13 +11,13 @@ import type { ExecutionRepository } from '@/databases/repositories/execution.rep
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
import type { EventService } from '@/events/event.service';
import type { IExecutingWorkflowData } from '@/interfaces';
import type { Logger } from '@/logging/logger.service';
import type { Telemetry } from '@/telemetry';
import { mockLogger } from '@test/mocking';
import { ConcurrencyQueue } from '../concurrency-queue';
describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const logger = mockLogger();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
const eventService = mock<EventService>();

View File

@@ -8,7 +8,6 @@ import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error
import { EventService } from '@/events/event.service';
import type { IExecutingWorkflowData } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import type { LogMetadata } from '@/logging/types';
import { Telemetry } from '@/telemetry';
import { ConcurrencyQueue } from './concurrency-queue';
@@ -34,6 +33,8 @@ export class ConcurrencyControlService {
private readonly telemetry: Telemetry,
private readonly eventService: EventService,
) {
this.logger = this.logger.withScope('executions');
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
if (this.productionLimit === 0) {
@@ -46,7 +47,6 @@ export class ConcurrencyControlService {
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
this.isEnabled = false;
this.log('Service disabled');
return;
}
@@ -65,12 +65,12 @@ export class ConcurrencyControlService {
});
this.productionQueue.on('execution-throttled', ({ executionId }) => {
this.log('Execution throttled', { executionId });
this.logger.debug('Execution throttled', { executionId });
this.eventService.emit('execution-throttled', { executionId });
});
this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId });
this.logger.debug('Execution released', { executionId });
});
}
@@ -144,9 +144,9 @@ export class ConcurrencyControlService {
// ----------------------------------
private logInit() {
this.log('Enabled');
this.logger.debug('Enabled');
this.log(
this.logger.debug(
[
'Production execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
@@ -171,10 +171,6 @@ export class ConcurrencyControlService {
throw new UnknownExecutionModeError(mode);
}
private log(message: string, metadata?: LogMetadata) {
this.logger.debug(['[Concurrency Control]', message].join(' '), metadata);
}
private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}

View File

@@ -12,7 +12,9 @@ import { mockInstance, mockEntityManager } from '@test/mocking';
describe('ExecutionRepository', () => {
const entityManager = mockEntityManager(ExecutionEntity);
const globalConfig = mockInstance(GlobalConfig, { logging: { outputs: ['console'] } });
const globalConfig = mockInstance(GlobalConfig, {
logging: { outputs: ['console'], scopes: [] },
});
const binaryDataService = mockInstance(BinaryDataService);
const executionRepository = Container.get(ExecutionRepository);
const mockDate = new Date('2023-12-28 12:34:56.789Z');

View File

@@ -2,7 +2,12 @@
import { In } from '@n8n/typeorm';
import glob from 'fast-glob';
import { Credentials, InstanceSettings } from 'n8n-core';
import { ApplicationError, jsonParse, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import {
ApplicationError,
jsonParse,
ErrorReporterProxy as ErrorReporter,
ensureError,
} from 'n8n-workflow';
import { readFile as fsReadFile } from 'node:fs/promises';
import path from 'path';
import { Container, Service } from 'typedi';
@@ -274,8 +279,9 @@ export class SourceControlImportService {
this.logger.debug(`Reactivating workflow id ${existingWorkflow.id}`);
await workflowManager.add(existingWorkflow.id, 'activate');
// update the versionId of the workflow to match the imported workflow
} catch (error) {
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, error as Error);
} catch (e) {
const error = ensureError(e);
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, { error });
} finally {
await Container.get(WorkflowRepository).update(
{ id: existingWorkflow.id },
@@ -377,8 +383,9 @@ export class SourceControlImportService {
await fsReadFile(candidate.file, { encoding: 'utf8' }),
{ fallbackValue: { tags: [], mappings: [] } },
);
} catch (error) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, error as Error);
} catch (e) {
const error = ensureError(e);
this.logger.error(`Failed to import tags from file ${candidate.file}`, { error });
return;
}
@@ -444,8 +451,8 @@ export class SourceControlImportService {
await fsReadFile(candidate.file, { encoding: 'utf8' }),
{ fallbackValue: [] },
);
} catch (error) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, error as Error);
} catch (e) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, { error: e });
return;
}
const overriddenKeys = Object.keys(valueOverrides ?? {});

View File

@@ -149,7 +149,7 @@ export class MessageEventBusLogWriter {
this._worker = new Worker(workerFileName);
if (this.worker) {
this.worker.on('messageerror', async (error) => {
this.logger.error('Event Bus Log Writer thread error, attempting to restart...', error);
this.logger.error('Event Bus Log Writer thread error, attempting to restart...', { error });
await MessageEventBusLogWriter.instance.startThread();
});
return true;

View File

@@ -1,5 +1,5 @@
import pick from 'lodash/pick';
import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow';
import { ensureError, type ExecutionStatus, type IRun, type IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@@ -95,7 +95,8 @@ export async function updateExistingExecution(parameters: {
);
}
} catch (e) {
logger.error(`Failed to save metadata for execution ID ${executionId}`, e as Error);
const error = ensureError(e);
logger.error(`Failed to save metadata for execution ID ${executionId}`, { error });
}
if (executionData.finished === true && executionData.retryOf !== undefined) {

View File

@@ -37,7 +37,9 @@ export class License {
private readonly orchestrationService: OrchestrationService,
private readonly settingsRepository: SettingsRepository,
private readonly licenseMetricsService: LicenseMetricsService,
) {}
) {
this.logger = this.logger.withScope('license');
}
/**
* Whether this instance should renew the license - on init and periodically.
@@ -109,9 +111,9 @@ export class License {
await this.manager.initialize();
this.logger.debug('License initialized');
} catch (e: unknown) {
if (e instanceof Error) {
this.logger.error('Could not initialize license manager sdk', e);
} catch (error: unknown) {
if (error instanceof Error) {
this.logger.error('Could not initialize license manager sdk', { error });
}
}
}

View File

@@ -11,6 +11,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['console'],
scopes: [],
},
});
@@ -30,6 +31,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['file'],
scopes: [],
file: {
fileSizeMax: 100,
fileCountMax: 16,
@@ -56,6 +58,7 @@ describe('Logger', () => {
logging: {
level: 'error',
outputs: ['console'],
scopes: [],
},
});
@@ -74,6 +77,7 @@ describe('Logger', () => {
logging: {
level: 'warn',
outputs: ['console'],
scopes: [],
},
});
@@ -92,6 +96,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['console'],
scopes: [],
},
});
@@ -110,6 +115,7 @@ describe('Logger', () => {
logging: {
level: 'debug',
outputs: ['console'],
scopes: [],
},
});
@@ -128,6 +134,7 @@ describe('Logger', () => {
logging: {
level: 'silent',
outputs: ['console'],
scopes: [],
},
});

View File

@@ -1,5 +1,7 @@
import type { LogScope } from '@n8n/config';
import { GlobalConfig } from '@n8n/config';
import callsites from 'callsites';
import type { TransformableInfo } from 'logform';
import { InstanceSettings } from 'n8n-core';
import { LoggerProxy, LOG_LEVELS } from 'n8n-workflow';
import path, { basename } from 'node:path';
@@ -15,10 +17,16 @@ import type { LogLocationMetadata, LogLevel, LogMetadata } from './types';
@Service()
export class Logger {
private readonly internalLogger: winston.Logger;
private internalLogger: winston.Logger;
private readonly level: LogLevel;
private readonly scopes: Set<LogScope>;
private get isScopingEnabled() {
return this.scopes.size > 0;
}
constructor(
private readonly globalConfig: GlobalConfig,
private readonly instanceSettings: InstanceSettings,
@@ -35,15 +43,30 @@ export class Logger {
if (!isSilent) {
this.setLevel();
const { outputs } = this.globalConfig.logging;
const { outputs, scopes } = this.globalConfig.logging;
if (outputs.includes('console')) this.setConsoleTransport();
if (outputs.includes('file')) this.setFileTransport();
this.scopes = new Set(scopes);
}
LoggerProxy.init(this);
}
private setInternalLogger(internalLogger: winston.Logger) {
this.internalLogger = internalLogger;
}
withScope(scope: LogScope) {
const scopedLogger = new Logger(this.globalConfig, this.instanceSettings);
const childLogger = this.internalLogger.child({ scope });
scopedLogger.setInternalLogger(childLogger);
return scopedLogger;
}
private log(level: LogLevel, message: string, metadata: LogMetadata) {
const location: LogLocationMetadata = {};
@@ -81,11 +104,22 @@ export class Logger {
this.internalLogger.add(new winston.transports.Console({ format }));
}
private scopeFilter() {
return winston.format((info: TransformableInfo & { metadata: LogMetadata }) => {
const shouldIncludeScope = info.metadata.scope && this.scopes.has(info.metadata.scope);
if (this.isScopingEnabled && !shouldIncludeScope) return false;
return info;
})();
}
private debugDevConsoleFormat() {
return winston.format.combine(
winston.format.metadata(),
winston.format.timestamp({ format: () => this.devTsFormat() }),
winston.format.colorize({ all: true }),
this.scopeFilter(),
winston.format.printf(({ level: _level, message, timestamp, metadata: _metadata }) => {
const SEPARATOR = ' '.repeat(3);
const LOG_LEVEL_COLUMN_WIDTH = 15; // 5 columns + ANSI color codes
@@ -100,6 +134,7 @@ export class Logger {
return winston.format.combine(
winston.format.metadata(),
winston.format.timestamp(),
this.scopeFilter(),
winston.format.printf(({ level, message, timestamp, metadata }) => {
const _metadata = this.toPrintable(metadata);
return `${timestamp} | ${level.padEnd(5)} | ${message}${_metadata ? ' ' + _metadata : ''}`;

View File

@@ -1,7 +1,14 @@
import type { LogScope } from '@n8n/config';
import type { LOG_LEVELS } from './constants';
export type LogLevel = (typeof LOG_LEVELS)[number];
export type LogLocationMetadata = Partial<{ file: string; function: string }>;
export type LogMetadata = {
[key: string]: unknown;
scope?: LogScope;
file?: string;
function?: string;
};
export type LogMetadata = Record<string, unknown> | Error;
export type LogLocationMetadata = Pick<LogMetadata, 'file' | 'function'>;

View File

@@ -6,7 +6,7 @@ import { ApplicationError } from 'n8n-workflow';
import Container from 'typedi';
import type { OrchestrationService } from '@/services/orchestration.service';
import { mockInstance } from '@test/mocking';
import { mockInstance, mockLogger } from '@test/mocking';
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
import type { JobProcessor } from '../job-processor';
@@ -74,7 +74,7 @@ describe('ScalingService', () => {
instanceSettings.markAsLeader();
scalingService = new ScalingService(
mock(),
mockLogger(),
mock(),
jobProcessor,
globalConfig,

View File

@@ -47,7 +47,9 @@ export class ScalingService {
private readonly instanceSettings: InstanceSettings,
private readonly orchestrationService: OrchestrationService,
private readonly eventService: EventService,
) {}
) {
this.logger = this.logger.withScope('scaling');
}
// #region Lifecycle
@@ -77,7 +79,7 @@ export class ScalingService {
this.scheduleQueueMetrics();
this.logger.debug('[ScalingService] Queue setup completed');
this.logger.debug('Queue setup completed');
}
setupWorker(concurrency: number) {
@@ -91,7 +93,7 @@ export class ScalingService {
// Errors thrown here will be sent to the main instance by bull. Logging
// them out and rethrowing them allows to find out which worker had the
// issue.
this.logger.error('[ScalingService] Executing a job errored', {
this.logger.error('Executing a job errored', {
jobId: job.id,
executionId: job.data.executionId,
error,
@@ -101,19 +103,19 @@ export class ScalingService {
}
});
this.logger.debug('[ScalingService] Worker setup completed');
this.logger.debug('Worker setup completed');
}
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async stop() {
await this.queue.pause(true, true);
this.logger.debug('[ScalingService] Queue paused');
this.logger.debug('Queue paused');
this.stopQueueRecovery();
this.stopQueueMetrics();
this.logger.debug('[ScalingService] Queue recovery and metrics stopped');
this.logger.debug('Queue recovery and metrics stopped');
let count = 0;
@@ -159,7 +161,7 @@ export class ScalingService {
const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions);
this.logger.info(`[ScalingService] Added job ${job.id} (execution ${jobData.executionId})`);
this.logger.info(`Added job ${job.id} (execution ${jobData.executionId})`);
return job;
}
@@ -180,16 +182,16 @@ export class ScalingService {
try {
if (await job.isActive()) {
await job.progress({ kind: 'abort-job' }); // being processed by worker
this.logger.debug('[ScalingService] Stopped active job', props);
this.logger.debug('Stopped active job', props);
return true;
}
await job.remove(); // not yet picked up, or waiting for next pickup (stalled)
this.logger.debug('[ScalingService] Stopped inactive job', props);
this.logger.debug('Stopped inactive job', props);
return true;
} catch (error: unknown) {
await job.progress({ kind: 'abort-job' });
this.logger.error('[ScalingService] Failed to stop job', { ...props, error });
this.logger.error('Failed to stop job', { ...props, error });
return false;
}
}
@@ -233,12 +235,12 @@ export class ScalingService {
* Even if Redis recovers, worker will remain unable to process jobs.
*/
if (error.message.includes('Error initializing Lua scripts')) {
this.logger.error('[ScalingService] Fatal error initializing worker', { error });
this.logger.error('[ScalingService] Exiting process...');
this.logger.error('Fatal error initializing worker', { error });
this.logger.error('Exiting process...');
process.exit(1);
}
this.logger.error('[ScalingService] Queue errored', { error });
this.logger.error('Queue errored', { error });
throw error;
});
@@ -251,7 +253,7 @@ export class ScalingService {
this.queue.on('error', (error: Error) => {
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
this.logger.error('[ScalingService] Queue errored', { error });
this.logger.error('Queue errored', { error });
throw error;
});
@@ -361,10 +363,10 @@ export class ScalingService {
const nextWaitMs = await this.recoverFromQueue();
this.scheduleQueueRecovery(nextWaitMs);
} catch (error) {
this.logger.error('[ScalingService] Failed to recover dangling executions from queue', {
this.logger.error('Failed to recover dangling executions from queue', {
msg: this.toErrorMsg(error),
});
this.logger.error('[ScalingService] Retrying...');
this.logger.error('Retrying...');
this.scheduleQueueRecovery();
}
@@ -372,7 +374,7 @@ export class ScalingService {
const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' ');
this.logger.debug(`[ScalingService] Scheduled queue recovery check for next ${wait}`);
this.logger.debug(`Scheduled queue recovery check for next ${wait}`);
}
private stopQueueRecovery() {
@@ -389,7 +391,7 @@ export class ScalingService {
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
if (storedIds.length === 0) {
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
this.logger.debug('Completed queue recovery check, no dangling executions');
return waitMs;
}
@@ -398,23 +400,22 @@ export class ScalingService {
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
if (queuedIds.size === 0) {
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
this.logger.debug('Completed queue recovery check, no dangling executions');
return waitMs;
}
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
if (danglingIds.length === 0) {
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
this.logger.debug('Completed queue recovery check, no dangling executions');
return waitMs;
}
await this.executionRepository.markAsCrashed(danglingIds);
this.logger.info(
'[ScalingService] Completed queue recovery check, recovered dangling executions',
{ danglingIds },
);
this.logger.info('Completed queue recovery check, recovered dangling executions', {
danglingIds,
});
// if this cycle used up the whole batch size, it is possible for there to be
// dangling executions outside this check, so speed up next cycle

View File

@@ -88,8 +88,7 @@ export class ImportService {
try {
await replaceInvalidCredentials(workflow);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
this.logger.error('Failed to replace invalid credential', error);
this.logger.error('Failed to replace invalid credential', { error: e });
}
}

View File

@@ -28,7 +28,9 @@ export class WaitTracker {
private readonly ownershipService: OwnershipService,
private readonly workflowRunner: WorkflowRunner,
private readonly orchestrationService: OrchestrationService,
) {}
) {
this.logger = this.logger.withScope('executions');
}
has(executionId: string) {
return this.waitingExecutions[executionId] !== undefined;
@@ -50,7 +52,7 @@ export class WaitTracker {
}
private startTracking() {
this.logger.debug('Wait tracker started tracking waiting executions');
this.logger.debug('Started tracking waiting executions');
// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
@@ -61,7 +63,7 @@ export class WaitTracker {
}
async getWaitingExecutions() {
this.logger.debug('Wait tracker querying database for waiting executions');
this.logger.debug('Querying database for waiting executions');
const executions = await this.executionRepository.getWaitingExecutions();
@@ -71,7 +73,7 @@ export class WaitTracker {
const executionIds = executions.map((execution) => execution.id).join(', ');
this.logger.debug(
`Wait tracker found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
`Found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
);
// Add timers for each waiting execution that they get started at the correct time
@@ -99,7 +101,7 @@ export class WaitTracker {
}
startExecution(executionId: string) {
this.logger.debug(`Wait tracker resuming execution ${executionId}`, { executionId });
this.logger.debug(`Resuming execution ${executionId}`, { executionId });
delete this.waitingExecutions[executionId];
(async () => {
@@ -141,7 +143,7 @@ export class WaitTracker {
}
stopTracking() {
this.logger.debug('Wait tracker shutting down');
this.logger.debug('Shutting down wait tracking');
clearInterval(this.mainTimer);
Object.keys(this.waitingExecutions).forEach((executionId) => {

View File

@@ -3,13 +3,12 @@ import { mockClear } from 'jest-mock-extended';
import { User } from '@/databases/entities/user';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { WorkflowHistoryRepository } from '@/databases/repositories/workflow-history.repository';
import { Logger } from '@/logging/logger.service';
import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service.ee';
import { mockInstance } from '@test/mocking';
import { mockInstance, mockLogger } from '@test/mocking';
import { getWorkflow } from '@test-integration/workflow';
const workflowHistoryRepository = mockInstance(WorkflowHistoryRepository);
const logger = mockInstance(Logger);
const logger = mockLogger();
const sharedWorkflowRepository = mockInstance(SharedWorkflowRepository);
const workflowHistoryService = new WorkflowHistoryService(
logger,
@@ -106,10 +105,6 @@ describe('WorkflowHistoryService', () => {
// Assert
expect(workflowHistoryRepository.insert).toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(
'Failed to save workflow history version for workflow 123',
expect.any(Error),
);
});
});
});

View File

@@ -1,3 +1,4 @@
import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';
import type { User } from '@/databases/entities/user';
@@ -79,10 +80,10 @@ export class WorkflowHistoryService {
workflowId,
});
} catch (e) {
this.logger.error(
`Failed to save workflow history version for workflow ${workflowId}`,
e as Error,
);
const error = ensureError(e);
this.logger.error(`Failed to save workflow history version for workflow ${workflowId}`, {
error,
});
}
}
}

View File

@@ -4,6 +4,8 @@ import type { Class } from 'n8n-core';
import type { DeepPartial } from 'ts-essentials';
import { Container } from 'typedi';
import type { Logger } from '@/logging/logger.service';
export const mockInstance = <T>(
serviceClass: Class<T>,
data: DeepPartial<T> | undefined = undefined,
@@ -22,3 +24,6 @@ export const mockEntityManager = (entityClass: Class) => {
Object.assign(entityManager, { connection: dataSource });
return entityManager;
};
export const mockLogger = () =>
mock<Logger>({ withScope: jest.fn().mockReturnValue(mock<Logger>()) });