mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
fix(core): Prioritize workflow execution with existing execution data on worker (#15165)
This commit is contained in:
committed by
GitHub
parent
c6ceee2bee
commit
12b681fc41
@@ -1,7 +1,8 @@
|
||||
import type { IExecutionResponse } from '@n8n/db';
|
||||
import type { ExecutionRepository } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { Logger } from 'n8n-core';
|
||||
import type { WorkflowExecute as ActualWorkflowExecute } from 'n8n-core';
|
||||
import { type Logger } from 'n8n-core';
|
||||
import { mockInstance } from 'n8n-core/test/utils';
|
||||
import type { IPinData, ITaskData, IWorkflowExecuteAdditionalData } from 'n8n-workflow';
|
||||
import { Workflow, type IRunExecutionData, type WorkflowExecuteMode } from 'n8n-workflow';
|
||||
@@ -27,6 +28,19 @@ mockInstance(WorkflowStaticDataService);
|
||||
mockInstance(WorkflowStatisticsService);
|
||||
mockInstance(ExternalHooks);
|
||||
|
||||
const processRunExecutionDataMock = jest.fn();
|
||||
jest.mock('n8n-core', () => {
|
||||
const original = jest.requireActual('n8n-core');
|
||||
|
||||
// Mock class constructor and prototype methods
|
||||
return {
|
||||
...original,
|
||||
WorkflowExecute: jest.fn().mockImplementation(() => ({
|
||||
processRunExecutionData: processRunExecutionDataMock,
|
||||
})),
|
||||
};
|
||||
});
|
||||
|
||||
const logger = mock<Logger>({
|
||||
scoped: jest.fn().mockImplementation(() => logger),
|
||||
});
|
||||
@@ -62,6 +76,7 @@ describe('JobProcessor', () => {
|
||||
workflowData: { nodes: [] },
|
||||
data: mock<IRunExecutionData>({
|
||||
isTestWebhook: false,
|
||||
executionData: undefined,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
@@ -98,6 +113,7 @@ describe('JobProcessor', () => {
|
||||
},
|
||||
pinData,
|
||||
},
|
||||
executionData: undefined,
|
||||
}),
|
||||
});
|
||||
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
||||
@@ -135,4 +151,56 @@ describe('JobProcessor', () => {
|
||||
pinData,
|
||||
);
|
||||
});
|
||||
|
||||
it.each(['manual', 'evaluation', 'trigger'] satisfies WorkflowExecuteMode[])(
|
||||
'should use workflowExecute to process a job with mode %p with execution data',
|
||||
async (mode) => {
|
||||
const { WorkflowExecute } = await import('n8n-core');
|
||||
// Type it correctly so we can use mock methods later
|
||||
const MockedWorkflowExecute = WorkflowExecute as jest.MockedClass<
|
||||
typeof ActualWorkflowExecute
|
||||
>;
|
||||
MockedWorkflowExecute.mockClear();
|
||||
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
const executionData = mock<IRunExecutionData>({
|
||||
isTestWebhook: false,
|
||||
startData: undefined,
|
||||
executionData: {
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
node: { name: 'node-name' },
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
executionRepository.findSingleExecution.mockResolvedValue(
|
||||
mock<IExecutionResponse>({
|
||||
mode,
|
||||
workflowData: { nodes: [] },
|
||||
data: executionData,
|
||||
}),
|
||||
);
|
||||
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(additionalData);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
mock(),
|
||||
executionRepository,
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
manualExecutionService,
|
||||
);
|
||||
|
||||
await jobProcessor.processJob(mock<Job>());
|
||||
|
||||
// Assert the constructor and method were called
|
||||
expect(MockedWorkflowExecute).toHaveBeenCalledWith(additionalData, mode, executionData);
|
||||
expect(processRunExecutionDataMock).toHaveBeenCalled();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -170,7 +170,10 @@ export class JobProcessor {
|
||||
|
||||
const { startData, resultData, manualData, isTestWebhook } = execution.data;
|
||||
|
||||
if (['manual', 'evaluation'].includes(execution.mode) && !isTestWebhook) {
|
||||
if (execution.data?.executionData) {
|
||||
workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data);
|
||||
workflowRun = workflowExecute.processRunExecutionData(workflow);
|
||||
} else if (['manual', 'evaluation'].includes(execution.mode) && !isTestWebhook) {
|
||||
const data: IWorkflowExecutionDataProcess = {
|
||||
executionMode: execution.mode,
|
||||
workflowData: execution.workflowData,
|
||||
@@ -211,9 +214,6 @@ export class JobProcessor {
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
} else if (execution.data !== undefined) {
|
||||
workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data);
|
||||
workflowRun = workflowExecute.processRunExecutionData(workflow);
|
||||
} else {
|
||||
this.errorReporter.info(`Worker found execution ${executionId} without data`);
|
||||
// Execute all nodes
|
||||
|
||||
Reference in New Issue
Block a user