fix: Run evaluations successfully when offload manual executions is true with queue mode (#16307)

Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
This commit is contained in:
Mutasem Aldmour
2025-06-13 15:33:48 +02:00
committed by GitHub
parent 879114b572
commit aa273745ec
7 changed files with 548 additions and 368 deletions

View File

@@ -416,7 +416,7 @@ describe('TestRunnerService', () => {
}
});
test('should call workflowRunner.run with correct data', async () => {
test('should call workflowRunner.run with correct data in normal execution mode', async () => {
// Create workflow with a trigger node
const triggerNodeName = 'Dataset Trigger';
const workflow = mock<IWorkflowBase>({
@@ -460,13 +460,79 @@ describe('TestRunnerService', () => {
expect(runCallArg).toHaveProperty('userId', metadata.userId);
expect(runCallArg).toHaveProperty('partialExecutionVersion', 2);
// Verify node execution stack contains the requestDataset flag
expect(runCallArg).toHaveProperty('executionData.executionData.nodeExecutionStack');
const nodeExecutionStack = runCallArg.executionData?.executionData?.nodeExecutionStack;
expect(nodeExecutionStack).toBeInstanceOf(Array);
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack?.[0]).toHaveProperty('node.name', triggerNodeName);
expect(nodeExecutionStack?.[0]).toHaveProperty('data.main[0][0].json.requestDataset', true);
expect(nodeExecutionStack?.[0]).toHaveProperty('node.forceCustomOperation', {
resource: 'dataset',
operation: 'getRows',
});
expect(nodeExecutionStack?.[0]).toHaveProperty('data.main[0][0].json', {});
expect(runCallArg).toHaveProperty('workflowData.nodes[0].forceCustomOperation', {
resource: 'dataset',
operation: 'getRows',
});
});
test('should call workflowRunner.run with correct data in queue execution mode and manual offload', async () => {
config.set('executions.mode', 'queue');
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS = 'true';
// Create workflow with a trigger node
const triggerNodeName = 'Dataset Trigger';
const workflow = mock<IWorkflowBase>({
nodes: [
{
id: 'node1',
name: triggerNodeName,
type: EVALUATION_TRIGGER_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
parameters: {},
},
],
connections: {},
settings: {
saveDataErrorExecution: 'all',
},
});
const metadata = {
testRunId: 'test-run-id',
userId: 'user-id',
};
// Call the method
await (testRunnerService as any).runDatasetTrigger(workflow, metadata);
// Verify workflowRunner.run was called
expect(workflowRunner.run).toHaveBeenCalledTimes(1);
// Get the argument passed to workflowRunner.run
const runCallArg = workflowRunner.run.mock.calls[0][0];
// Verify it has the correct structure
expect(runCallArg).toHaveProperty('destinationNode', triggerNodeName);
expect(runCallArg).toHaveProperty('executionMode', 'manual');
expect(runCallArg).toHaveProperty('workflowData.settings.saveManualExecutions', false);
expect(runCallArg).toHaveProperty('workflowData.settings.saveDataErrorExecution', 'none');
expect(runCallArg).toHaveProperty('workflowData.settings.saveDataSuccessExecution', 'none');
expect(runCallArg).toHaveProperty('workflowData.settings.saveExecutionProgress', false);
expect(runCallArg).toHaveProperty('userId', metadata.userId);
expect(runCallArg).toHaveProperty('partialExecutionVersion', 2);
expect(runCallArg).not.toHaveProperty('executionData.executionData');
expect(runCallArg).not.toHaveProperty('executionData.executionData.nodeExecutionStack');
expect(runCallArg).toHaveProperty('workflowData.nodes[0].forceCustomOperation', {
resource: 'dataset',
operation: 'getRows',
});
// after reset
config.set('executions.mode', 'regular');
delete process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS;
});
test('should wait for execution to finish and return result', async () => {
@@ -718,6 +784,7 @@ describe('TestRunnerService', () => {
typeVersion: 1,
position: [0, 0],
parameters: {},
forceCustomOperation: undefined,
},
],
connections: {},

View File

@@ -13,10 +13,10 @@ import type {
IRun,
IWorkflowBase,
IWorkflowExecutionDataProcess,
IExecuteData,
INodeExecutionData,
AssignmentCollectionValue,
GenericValue,
IExecuteData,
} from 'n8n-workflow';
import assert from 'node:assert';
@@ -259,16 +259,11 @@ export class TestRunnerService {
throw new TestRunError('EVALUATION_TRIGGER_NOT_FOUND');
}
// Initialize the input data for dataset trigger
// Provide a flag indicating that we want to get the whole dataset
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push({
node: triggerNode,
data: {
main: [[{ json: { requestDataset: true } }]],
},
source: null,
});
// Call custom operation to fetch the whole dataset
triggerNode.forceCustomOperation = {
resource: 'dataset',
operation: 'getRows',
};
const data: IWorkflowExecutionDataProcess = {
destinationNode: triggerNode.name,
@@ -293,13 +288,6 @@ export class TestRunnerService {
resultData: {
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack,
waitingExecution: {},
waitingExecutionSource: {},
},
manualData: {
userId: metadata.userId,
partialExecutionVersion: 2,
@@ -313,6 +301,33 @@ export class TestRunnerService {
},
};
if (
!(
config.get('executions.mode') === 'queue' &&
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'
) &&
data.executionData
) {
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push({
node: triggerNode,
data: {
main: [[{ json: {} }]],
},
source: null,
});
data.executionData.executionData = {
contextData: {},
metadata: {},
// workflow does not evaluate correctly if this is passed in queue mode with offload manual executions
// but this is expected otherwise in regular execution mode
nodeExecutionStack,
waitingExecution: {},
waitingExecutionSource: {},
};
}
// Trigger the workflow under test with mocked data
const executionId = await this.workflowRunner.run(data);
assert(executionId);