fix(core): Support task runner in execute and execute-batch commands (#15147)

This commit is contained in:
Iván Ovejero
2025-05-06 17:53:36 +02:00
committed by GitHub
parent 1e5cb55494
commit 985f554501
7 changed files with 196 additions and 15 deletions

View File

@@ -0,0 +1,94 @@
import { GlobalConfig } from '@n8n/config';
import type { User, WorkflowEntity } from '@n8n/db';
import { Container } from '@n8n/di';
import type { SelectQueryBuilder } from '@n8n/typeorm';
import type { Config } from '@oclif/core';
import { mock } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import { ActiveExecutions } from '@/active-executions';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { DeprecationService } from '@/deprecation/deprecation.service';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import { ExternalHooks } from '@/external-hooks';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { PostHogClient } from '@/posthog';
import { OwnershipService } from '@/services/ownership.service';
import { ShutdownService } from '@/shutdown/shutdown.service';
import { TaskRunnerModule } from '@/task-runners/task-runner-module';
import { WorkflowRunner } from '@/workflow-runner';
import { mockInstance } from '@test/mocking';
import { ExecuteBatch } from '../execute-batch';
const taskRunnerModule = mockInstance(TaskRunnerModule);
const workflowRepository = mockInstance(WorkflowRepository);
const ownershipService = mockInstance(OwnershipService);
const workflowRunner = mockInstance(WorkflowRunner);
const activeExecutions = mockInstance(ActiveExecutions);
const loadNodesAndCredentials = mockInstance(LoadNodesAndCredentials);
const shutdownService = mockInstance(ShutdownService);
const deprecationService = mockInstance(DeprecationService);
mockInstance(MessageEventBus);
const posthogClient = mockInstance(PostHogClient);
const telemetryEventRelay = mockInstance(TelemetryEventRelay);
const externalHooks = mockInstance(ExternalHooks);
jest.mock('@/db', () => ({
init: jest.fn().mockResolvedValue(undefined),
migrate: jest.fn().mockResolvedValue(undefined),
connectionState: { connected: false },
close: jest.fn().mockResolvedValue(undefined),
}));
test('should start a task runner when task runners are enabled', async () => {
// arrange
const workflow = mock<WorkflowEntity>({
id: '123',
nodes: [{ type: 'n8n-nodes-base.manualTrigger' }],
});
const run = mock<IRun>({ data: { resultData: { error: undefined } } });
const queryBuilder = mock<SelectQueryBuilder<WorkflowEntity>>({
andWhere: jest.fn().mockReturnThis(),
getMany: jest.fn().mockResolvedValue([workflow]),
});
loadNodesAndCredentials.init.mockResolvedValue(undefined);
shutdownService.shutdown.mockReturnValue();
deprecationService.warn.mockReturnValue();
posthogClient.init.mockResolvedValue();
telemetryEventRelay.init.mockResolvedValue();
externalHooks.init.mockResolvedValue();
workflowRepository.createQueryBuilder.mockReturnValue(queryBuilder);
ownershipService.getInstanceOwner.mockResolvedValue(mock<User>({ id: '123' }));
workflowRunner.run.mockResolvedValue('123');
activeExecutions.getPostExecutePromise.mockResolvedValue(run);
Container.set(
GlobalConfig,
mock<GlobalConfig>({
taskRunners: { enabled: true },
nodes: { communityPackages: { enabled: false } },
}),
);
const cmd = new ExecuteBatch([], {} as Config);
// @ts-expect-error Private property
cmd.parse = jest.fn().mockResolvedValue({ flags: {} });
// @ts-expect-error Private property
cmd.runTests = jest.fn().mockResolvedValue({ summary: { failedExecutions: [] } });
// act
await cmd.init();
await cmd.run();
// assert
expect(taskRunnerModule.start).toHaveBeenCalledTimes(1);
});

View File

@@ -0,0 +1,86 @@
import { GlobalConfig } from '@n8n/config';
import type { User, WorkflowEntity } from '@n8n/db';
import { Container } from '@n8n/di';
import type { Config } from '@oclif/core';
import { mock } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import { ActiveExecutions } from '@/active-executions';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { DeprecationService } from '@/deprecation/deprecation.service';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import { ExternalHooks } from '@/external-hooks';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { PostHogClient } from '@/posthog';
import { OwnershipService } from '@/services/ownership.service';
import { ShutdownService } from '@/shutdown/shutdown.service';
import { TaskRunnerModule } from '@/task-runners/task-runner-module';
import { WorkflowRunner } from '@/workflow-runner';
import { mockInstance } from '@test/mocking';
import { Execute } from '../execute';
const taskRunnerModule = mockInstance(TaskRunnerModule);
const workflowRepository = mockInstance(WorkflowRepository);
const ownershipService = mockInstance(OwnershipService);
const workflowRunner = mockInstance(WorkflowRunner);
const activeExecutions = mockInstance(ActiveExecutions);
const loadNodesAndCredentials = mockInstance(LoadNodesAndCredentials);
const shutdownService = mockInstance(ShutdownService);
const deprecationService = mockInstance(DeprecationService);
mockInstance(MessageEventBus);
const posthogClient = mockInstance(PostHogClient);
const telemetryEventRelay = mockInstance(TelemetryEventRelay);
const externalHooks = mockInstance(ExternalHooks);
jest.mock('@/db', () => ({
init: jest.fn().mockResolvedValue(undefined),
migrate: jest.fn().mockResolvedValue(undefined),
connectionState: { connected: false },
close: jest.fn().mockResolvedValue(undefined),
}));
test('should start a task runner when task runners are enabled', async () => {
// arrange
const workflow = mock<WorkflowEntity>({
id: '123',
nodes: [{ type: 'n8n-nodes-base.manualTrigger' }],
});
const run = mock<IRun>({ data: { resultData: { error: undefined } } });
loadNodesAndCredentials.init.mockResolvedValue(undefined);
shutdownService.shutdown.mockReturnValue();
deprecationService.warn.mockReturnValue();
posthogClient.init.mockResolvedValue();
telemetryEventRelay.init.mockResolvedValue();
externalHooks.init.mockResolvedValue();
workflowRepository.findOneBy.mockResolvedValue(workflow);
ownershipService.getInstanceOwner.mockResolvedValue(mock<User>({ id: '123' }));
workflowRunner.run.mockResolvedValue('123');
activeExecutions.getPostExecutePromise.mockResolvedValue(run);
Container.set(
GlobalConfig,
mock<GlobalConfig>({
taskRunners: { enabled: true },
nodes: { communityPackages: { enabled: false } },
}),
);
const cmd = new Execute([], {} as Config);
// @ts-expect-error Private property
cmd.parse = jest.fn().mockResolvedValue({ flags: { id: '123' } });
// act
await cmd.init();
await cmd.run();
// assert
expect(taskRunnerModule.start).toHaveBeenCalledTimes(1);
});

View File

@@ -69,6 +69,9 @@ export abstract class BaseCommand extends Command {
/** Whether to init community packages (if enabled) */ /** Whether to init community packages (if enabled) */
protected needsCommunityPackages = false; protected needsCommunityPackages = false;
/** Whether to init task runner (if enabled). */
protected needsTaskRunner = false;
protected async loadModules() { protected async loadModules() {
for (const moduleName of this.modulesConfig.modules) { for (const moduleName of this.modulesConfig.modules) {
let preInitModule: ModulePreInit | undefined; let preInitModule: ModulePreInit | undefined;
@@ -156,6 +159,11 @@ export abstract class BaseCommand extends Command {
await Container.get(CommunityPackagesService).checkForMissingPackages(); await Container.get(CommunityPackagesService).checkForMissingPackages();
} }
if (this.needsTaskRunner && this.globalConfig.taskRunners.enabled) {
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
await Container.get(TaskRunnerModule).start();
}
// TODO: remove this after the cyclic dependencies around the event-bus are resolved // TODO: remove this after the cyclic dependencies around the event-bus are resolved
Container.get(MessageEventBus); Container.get(MessageEventBus);

View File

@@ -112,6 +112,8 @@ export class ExecuteBatch extends BaseCommand {
override needsCommunityPackages = true; override needsCommunityPackages = true;
override needsTaskRunner = true;
/** /**
* Gracefully handles exit. * Gracefully handles exit.
* @param {boolean} skipExit Whether to skip exit or number according to received signal * @param {boolean} skipExit Whether to skip exit or number according to received signal
@@ -335,7 +337,6 @@ export class ExecuteBatch extends BaseCommand {
if (results.summary.failedExecutions > 0) { if (results.summary.failedExecutions > 0) {
this.exit(1); this.exit(1);
} }
this.exit(0);
} }
mergeResults(results: IResult, retryResults: IResult) { mergeResults(results: IResult, retryResults: IResult) {

View File

@@ -29,6 +29,8 @@ export class Execute extends BaseCommand {
override needsCommunityPackages = true; override needsCommunityPackages = true;
override needsTaskRunner = true;
async init() { async init() {
await super.init(); await super.init();
await this.initBinaryDataService(); await this.initBinaryDataService();

View File

@@ -68,6 +68,8 @@ export class Start extends BaseCommand {
override needsCommunityPackages = true; override needsCommunityPackages = true;
override needsTaskRunner = true;
private getEditorUrl = () => Container.get(UrlService).getInstanceBaseUrl(); private getEditorUrl = () => Container.get(UrlService).getInstanceBaseUrl();
/** /**
@@ -234,13 +236,6 @@ export class Start extends BaseCommand {
await this.generateStaticAssets(); await this.generateStaticAssets();
} }
const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (taskRunnerConfig.enabled) {
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start();
}
await this.loadModules(); await this.loadModules();
} }

View File

@@ -39,6 +39,8 @@ export class Worker extends BaseCommand {
override needsCommunityPackages = true; override needsCommunityPackages = true;
override needsTaskRunner = true;
/** /**
* Stop n8n in a graceful way. * Stop n8n in a graceful way.
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
@@ -108,13 +110,6 @@ export class Worker extends BaseCommand {
}), }),
); );
const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (taskRunnerConfig.enabled) {
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start();
}
await this.loadModules(); await this.loadModules();
} }