mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 01:56:46 +00:00
refactor(core): Simplify worker execution path (#15253)
This commit is contained in:
@@ -53,7 +53,6 @@ describe('JobProcessor', () => {
|
|||||||
);
|
);
|
||||||
const jobProcessor = new JobProcessor(
|
const jobProcessor = new JobProcessor(
|
||||||
logger,
|
logger,
|
||||||
mock(),
|
|
||||||
executionRepository,
|
executionRepository,
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -75,7 +74,6 @@ describe('JobProcessor', () => {
|
|||||||
mode,
|
mode,
|
||||||
workflowData: { nodes: [] },
|
workflowData: { nodes: [] },
|
||||||
data: mock<IRunExecutionData>({
|
data: mock<IRunExecutionData>({
|
||||||
isTestWebhook: false,
|
|
||||||
executionData: undefined,
|
executionData: undefined,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
@@ -84,7 +82,6 @@ describe('JobProcessor', () => {
|
|||||||
const manualExecutionService = mock<ManualExecutionService>();
|
const manualExecutionService = mock<ManualExecutionService>();
|
||||||
const jobProcessor = new JobProcessor(
|
const jobProcessor = new JobProcessor(
|
||||||
logger,
|
logger,
|
||||||
mock(),
|
|
||||||
executionRepository,
|
executionRepository,
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -105,7 +102,6 @@ describe('JobProcessor', () => {
|
|||||||
mode: 'manual',
|
mode: 'manual',
|
||||||
workflowData: { nodes: [], pinData },
|
workflowData: { nodes: [], pinData },
|
||||||
data: mock<IRunExecutionData>({
|
data: mock<IRunExecutionData>({
|
||||||
isTestWebhook: false,
|
|
||||||
resultData: {
|
resultData: {
|
||||||
runData: {
|
runData: {
|
||||||
trigger: [mock<ITaskData>({ executionIndex: 1 })],
|
trigger: [mock<ITaskData>({ executionIndex: 1 })],
|
||||||
@@ -124,7 +120,6 @@ describe('JobProcessor', () => {
|
|||||||
const manualExecutionService = mock<ManualExecutionService>();
|
const manualExecutionService = mock<ManualExecutionService>();
|
||||||
const jobProcessor = new JobProcessor(
|
const jobProcessor = new JobProcessor(
|
||||||
logger,
|
logger,
|
||||||
mock(),
|
|
||||||
executionRepository,
|
executionRepository,
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -164,7 +159,6 @@ describe('JobProcessor', () => {
|
|||||||
|
|
||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
const executionData = mock<IRunExecutionData>({
|
const executionData = mock<IRunExecutionData>({
|
||||||
isTestWebhook: false,
|
|
||||||
startData: undefined,
|
startData: undefined,
|
||||||
executionData: {
|
executionData: {
|
||||||
nodeExecutionStack: [
|
nodeExecutionStack: [
|
||||||
@@ -188,7 +182,6 @@ describe('JobProcessor', () => {
|
|||||||
const manualExecutionService = mock<ManualExecutionService>();
|
const manualExecutionService = mock<ManualExecutionService>();
|
||||||
const jobProcessor = new JobProcessor(
|
const jobProcessor = new JobProcessor(
|
||||||
logger,
|
logger,
|
||||||
mock(),
|
|
||||||
executionRepository,
|
executionRepository,
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
|
|||||||
@@ -1,13 +1,7 @@
|
|||||||
import type { RunningJobSummary } from '@n8n/api-types';
|
import type { RunningJobSummary } from '@n8n/api-types';
|
||||||
import { ExecutionRepository, WorkflowRepository } from '@n8n/db';
|
import { ExecutionRepository, WorkflowRepository } from '@n8n/db';
|
||||||
import { Service } from '@n8n/di';
|
import { Service } from '@n8n/di';
|
||||||
import {
|
import { WorkflowHasIssuesError, InstanceSettings, WorkflowExecute, Logger } from 'n8n-core';
|
||||||
WorkflowHasIssuesError,
|
|
||||||
InstanceSettings,
|
|
||||||
WorkflowExecute,
|
|
||||||
ErrorReporter,
|
|
||||||
Logger,
|
|
||||||
} from 'n8n-core';
|
|
||||||
import type {
|
import type {
|
||||||
ExecutionStatus,
|
ExecutionStatus,
|
||||||
IExecuteResponsePromiseData,
|
IExecuteResponsePromiseData,
|
||||||
@@ -41,7 +35,6 @@ export class JobProcessor {
|
|||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly errorReporter: ErrorReporter,
|
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly workflowRepository: WorkflowRepository,
|
private readonly workflowRepository: WorkflowRepository,
|
||||||
private readonly nodeTypes: NodeTypes,
|
private readonly nodeTypes: NodeTypes,
|
||||||
@@ -167,12 +160,12 @@ export class JobProcessor {
|
|||||||
let workflowExecute: WorkflowExecute;
|
let workflowExecute: WorkflowExecute;
|
||||||
let workflowRun: PCancelable<IRun>;
|
let workflowRun: PCancelable<IRun>;
|
||||||
|
|
||||||
const { startData, resultData, manualData, isTestWebhook } = execution.data;
|
const { startData, resultData, manualData } = execution.data;
|
||||||
|
|
||||||
if (execution.data?.executionData) {
|
if (execution.data?.executionData) {
|
||||||
workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data);
|
workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data);
|
||||||
workflowRun = workflowExecute.processRunExecutionData(workflow);
|
workflowRun = workflowExecute.processRunExecutionData(workflow);
|
||||||
} else if (['manual', 'evaluation'].includes(execution.mode) && !isTestWebhook) {
|
} else {
|
||||||
const data: IWorkflowExecutionDataProcess = {
|
const data: IWorkflowExecutionDataProcess = {
|
||||||
executionMode: execution.mode,
|
executionMode: execution.mode,
|
||||||
workflowData: execution.workflowData,
|
workflowData: execution.workflowData,
|
||||||
@@ -213,12 +206,6 @@ export class JobProcessor {
|
|||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
this.errorReporter.info(`Worker found execution ${executionId} without data`);
|
|
||||||
// Execute all nodes
|
|
||||||
// Can execute without webhook so go on
|
|
||||||
workflowExecute = new WorkflowExecute(additionalData, execution.mode);
|
|
||||||
workflowRun = workflowExecute.run(workflow);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const runningJob: RunningJob = {
|
const runningJob: RunningJob = {
|
||||||
|
|||||||
@@ -200,27 +200,6 @@ describe('WaitingForms', () => {
|
|||||||
expect(result).toBe('Form2');
|
expect(result).toBe('Form2');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should mark as test form webhook when execution mode is manual', async () => {
|
|
||||||
jest
|
|
||||||
// @ts-expect-error Protected method
|
|
||||||
.spyOn(waitingForms, 'getWebhookExecutionData')
|
|
||||||
// @ts-expect-error Protected method
|
|
||||||
.mockResolvedValue(mock<IWebhookResponseCallbackData>());
|
|
||||||
|
|
||||||
const execution = mock<IExecutionResponse>({
|
|
||||||
finished: false,
|
|
||||||
mode: 'manual',
|
|
||||||
data: {
|
|
||||||
resultData: { lastNodeExecuted: 'someNode', error: undefined },
|
|
||||||
},
|
|
||||||
});
|
|
||||||
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
|
||||||
|
|
||||||
await waitingForms.executeWebhook(mock<WaitingWebhookRequest>(), mock<express.Response>());
|
|
||||||
|
|
||||||
expect(execution.data.isTestWebhook).toBe(true);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should return status of execution if suffix is WAITING_FORMS_EXECUTION_STATUS', async () => {
|
it('should return status of execution if suffix is WAITING_FORMS_EXECUTION_STATUS', async () => {
|
||||||
const execution = mock<IExecutionResponse>({
|
const execution = mock<IExecutionResponse>({
|
||||||
status: 'success',
|
status: 'success',
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import { mock } from 'jest-mock-extended';
|
|||||||
import { ConflictError } from '@/errors/response-errors/conflict.error';
|
import { ConflictError } from '@/errors/response-errors/conflict.error';
|
||||||
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
||||||
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
||||||
import type { IWebhookResponseCallbackData, WaitingWebhookRequest } from '@/webhooks/webhook.types';
|
import type { WaitingWebhookRequest } from '@/webhooks/webhook.types';
|
||||||
|
|
||||||
describe('WaitingWebhooks', () => {
|
describe('WaitingWebhooks', () => {
|
||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
@@ -79,25 +79,4 @@ describe('WaitingWebhooks', () => {
|
|||||||
*/
|
*/
|
||||||
await expect(promise).rejects.toThrowError(ConflictError);
|
await expect(promise).rejects.toThrowError(ConflictError);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should mark as test webhook when execution mode is manual', async () => {
|
|
||||||
jest
|
|
||||||
// @ts-expect-error Protected method
|
|
||||||
.spyOn(waitingWebhooks, 'getWebhookExecutionData')
|
|
||||||
// @ts-expect-error Protected method
|
|
||||||
.mockResolvedValue(mock<IWebhookResponseCallbackData>());
|
|
||||||
|
|
||||||
const execution = mock<IExecutionResponse>({
|
|
||||||
finished: false,
|
|
||||||
mode: 'manual',
|
|
||||||
data: {
|
|
||||||
resultData: { lastNodeExecuted: 'someNode', error: undefined },
|
|
||||||
},
|
|
||||||
});
|
|
||||||
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
|
||||||
|
|
||||||
await waitingWebhooks.executeWebhook(mock<WaitingWebhookRequest>(), mock<express.Response>());
|
|
||||||
|
|
||||||
expect(execution.data.isTestWebhook).toBe(true);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -137,12 +137,6 @@ export class WaitingForms extends WaitingWebhooks {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A manual execution resumed by a webhook call needs to be marked as such
|
|
||||||
* so workers in scaling mode reuse the existing execution data.
|
|
||||||
*/
|
|
||||||
if (execution.mode === 'manual') execution.data.isTestWebhook = true;
|
|
||||||
|
|
||||||
return await this.getWebhookExecutionData({
|
return await this.getWebhookExecutionData({
|
||||||
execution,
|
execution,
|
||||||
req,
|
req,
|
||||||
|
|||||||
@@ -121,12 +121,6 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||||||
|
|
||||||
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
|
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
|
||||||
|
|
||||||
/**
|
|
||||||
* A manual execution resumed by a webhook call needs to be marked as such
|
|
||||||
* so workers in scaling mode reuse the existing execution data.
|
|
||||||
*/
|
|
||||||
if (execution.mode === 'manual') execution.data.isTestWebhook = true;
|
|
||||||
|
|
||||||
return await this.getWebhookExecutionData({
|
return await this.getWebhookExecutionData({
|
||||||
execution,
|
execution,
|
||||||
req,
|
req,
|
||||||
|
|||||||
@@ -44,11 +44,9 @@ import {
|
|||||||
UnexpectedError,
|
UnexpectedError,
|
||||||
WAIT_NODE_TYPE,
|
WAIT_NODE_TYPE,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import assert from 'node:assert';
|
|
||||||
import { finished } from 'stream/promises';
|
import { finished } from 'stream/promises';
|
||||||
|
|
||||||
import { ActiveExecutions } from '@/active-executions';
|
import { ActiveExecutions } from '@/active-executions';
|
||||||
import config from '@/config';
|
|
||||||
import { MCP_TRIGGER_NODE_TYPE } from '@/constants';
|
import { MCP_TRIGGER_NODE_TYPE } from '@/constants';
|
||||||
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
|
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
|
||||||
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
||||||
@@ -638,15 +636,6 @@ export async function executeWebhook(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
|
||||||
config.getEnv('executions.mode') === 'queue' &&
|
|
||||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true' &&
|
|
||||||
runData.executionMode === 'manual'
|
|
||||||
) {
|
|
||||||
assert(runData.executionData);
|
|
||||||
runData.executionData.isTestWebhook = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start now to run the workflow
|
// Start now to run the workflow
|
||||||
executionId = await Container.get(WorkflowRunner).run(
|
executionId = await Container.get(WorkflowRunner).run(
|
||||||
runData,
|
runData,
|
||||||
|
|||||||
@@ -2164,9 +2164,6 @@ export interface IRunExecutionData {
|
|||||||
waitTill?: Date;
|
waitTill?: Date;
|
||||||
pushRef?: string;
|
pushRef?: string;
|
||||||
|
|
||||||
/** Whether this execution was started by a test webhook call. */
|
|
||||||
isTestWebhook?: boolean;
|
|
||||||
|
|
||||||
/** Data needed for a worker to run a manual execution. */
|
/** Data needed for a worker to run a manual execution. */
|
||||||
manualData?: Pick<
|
manualData?: Pick<
|
||||||
IWorkflowExecutionDataProcess,
|
IWorkflowExecutionDataProcess,
|
||||||
|
|||||||
Reference in New Issue
Block a user