diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 21e277a5dc..a437414469 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -21,7 +21,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus' import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; import { License } from '@/license'; -import { SingleMainTaskManager } from '@/runners/task-managers/single-main-task-manager'; +import { LocalTaskManager } from '@/runners/task-managers/local-task-manager'; import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; @@ -227,7 +227,7 @@ export class Start extends BaseCommand { } if (!this.globalConfig.taskRunners.disabled) { - Container.set(TaskManager, new SingleMainTaskManager()); + Container.set(TaskManager, new LocalTaskManager()); const { TaskRunnerServer } = await import('@/runners/task-runner-server'); const taskRunnerServer = Container.get(TaskRunnerServer); await taskRunnerServer.start(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index c86cf0936f..438ed9c8c8 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -8,6 +8,8 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { Logger } from '@/logging/logger.service'; +import { LocalTaskManager } from '@/runners/task-managers/local-task-manager'; +import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; @@ -113,6 +115,17 @@ export class Worker extends BaseCommand { }, }), ); + + if (!this.globalConfig.taskRunners.disabled) { + Container.set(TaskManager, new LocalTaskManager()); + const { TaskRunnerServer } = await import('@/runners/task-runner-server'); + const taskRunnerServer = Container.get(TaskRunnerServer); + await taskRunnerServer.start(); + + const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); + const runnerProcess = Container.get(TaskRunnerProcess); + await runnerProcess.start(); + } } async initEventBus() { diff --git a/packages/cli/src/runners/task-managers/single-main-task-manager.ts b/packages/cli/src/runners/task-managers/local-task-manager.ts similarity index 92% rename from packages/cli/src/runners/task-managers/single-main-task-manager.ts rename to packages/cli/src/runners/task-managers/local-task-manager.ts index b5b60df72b..a8fca01b2c 100644 --- a/packages/cli/src/runners/task-managers/single-main-task-manager.ts +++ b/packages/cli/src/runners/task-managers/local-task-manager.ts @@ -5,7 +5,7 @@ import type { RequesterMessage } from '../runner-types'; import type { RequesterMessageCallback } from '../task-broker.service'; import { TaskBroker } from '../task-broker.service'; -export class SingleMainTaskManager extends TaskManager { +export class LocalTaskManager extends TaskManager { taskBroker: TaskBroker; id: string = 'single-main'; diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 1840a9b170..8c33755fd7 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -1,6 +1,8 @@ process.argv[2] = 'worker'; +import { TaskRunnersConfig } from '@n8n/config'; import { BinaryDataService } from 'n8n-core'; +import Container from 'typedi'; import { Worker } from '@/commands/worker'; import config from '@/config'; @@ -11,6 +13,8 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { Push } from '@/push'; +import { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { TaskRunnerServer } from '@/runners/task-runner-server'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; @@ -22,6 +26,7 @@ import { mockInstance } from '../../shared/mocking'; config.set('executions.mode', 'queue'); config.set('binaryDataManager.availableModes', 'filesystem'); +Container.get(TaskRunnersConfig).disabled = false; mockInstance(LoadNodesAndCredentials); const binaryDataService = mockInstance(BinaryDataService); const externalHooks = mockInstance(ExternalHooks); @@ -31,6 +36,8 @@ const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); +const taskRunnerServer = mockInstance(TaskRunnerServer); +const taskRunnerProcess = mockInstance(TaskRunnerProcess); mockInstance(Publisher); mockInstance(Subscriber); mockInstance(Telemetry); @@ -55,6 +62,8 @@ test('worker initializes all its components', async () => { expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); + expect(taskRunnerServer.start).toHaveBeenCalledTimes(1); + expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1); expect(config.getEnv('executions.mode')).toBe('queue'); });