fix(editor): Fix partial chat executions (#15379)

This commit is contained in:
oleg
2025-05-15 17:12:08 +02:00
committed by GitHub
parent 726438d95e
commit b6370fb2ec
8 changed files with 452 additions and 32 deletions

View File

@@ -397,3 +397,7 @@ export function clickContextMenuAction(action: string) {
export function openExecutions() {
cy.getByTestId('radio-button-executions').click();
}
export function clickClearExecutionDataButton() {
cy.getByTestId('clear-execution-data-button').click();
}

View File

@@ -0,0 +1,57 @@
import {
getManualChatMessages,
getManualChatModal,
sendManualChatMessage,
} from '../composables/modals/chat-modal';
import { clickExecuteNode } from '../composables/ndv';
import {
clickZoomToFit,
openNode,
navigateToNewWorkflowPage,
openContextMenu,
clickContextMenuAction,
clickClearExecutionDataButton,
} from '../composables/workflow';
import { clearNotifications } from '../pages/notifications';
describe('AI-812-partial-execs-broken-when-using-chat-trigger', () => {
beforeEach(() => {
navigateToNewWorkflowPage();
cy.createFixtureWorkflow('Test_chat_partial_execution.json');
clearNotifications();
clickZoomToFit();
openContextMenu('Edit Fields');
clickContextMenuAction('deselect_all');
});
// Check if the full execution still behaves as expected after the partial execution tests
afterEach(() => {
clearNotifications();
clickClearExecutionDataButton();
sendManualChatMessage('Test Full Execution');
getManualChatMessages().should('have.length', 4);
getManualChatMessages().should('contain', 'Set 3 with chatInput: Test Full Execution');
});
it('should do partial execution when using chat trigger and clicking NDV execute node', () => {
openNode('Edit Fields1');
clickExecuteNode();
getManualChatModal().should('exist');
sendManualChatMessage('Test Partial Execution');
getManualChatMessages().should('have.length', 2);
getManualChatMessages().should('contain', 'Test Partial Execution');
getManualChatMessages().should('contain', 'Set 2 with chatInput: Test Partial Execution');
});
it('should do partial execution when using chat trigger and context-menu execute node', () => {
openContextMenu('Edit Fields');
clickContextMenuAction('execute');
getManualChatModal().should('exist');
sendManualChatMessage('Test Partial Execution');
getManualChatMessages().should('have.length', 2);
getManualChatMessages().should('contain', 'Test Partial Execution');
getManualChatMessages().should('contain', 'Set 1 with chatInput: Test Partial Execution');
});
});

View File

@@ -0,0 +1,127 @@
{
"nodes": [
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "0c345346-8cef-415c-aa1a-3d3941bb4035",
"name": "text",
"value": "=Set 1 with chatInput: {{ $json.chatInput }}",
"type": "string"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [
220,
0
],
"id": "b1584b5b-c17c-4fd9-9b75-dd61f2c4c20d",
"name": "Edit Fields"
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "9a7bd7af-c3fb-4984-b15a-2f805b66ed02",
"name": "text",
"value": "=Set 2 with chatInput: {{ $('When chat message received').item.json.chatInput }}",
"type": "string"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [
440,
0
],
"id": "e9e02219-4b6b-48d1-8d3d-2c850362abf2",
"name": "Edit Fields1"
},
{
"parameters": {
"options": {}
},
"type": "@n8n/n8n-nodes-langchain.chatTrigger",
"typeVersion": 1.1,
"position": [
0,
0
],
"id": "c2dd390e-1360-4d6f-a922-4d295246a886",
"name": "When chat message received",
"webhookId": "28da48d8-cef1-4364-b4d6-429212d2e3f6"
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "9a7bd7af-c3fb-4984-b15a-2f805b66ed02",
"name": "text",
"value": "=Set 3 with chatInput: {{ $('When chat message received').item.json.chatInput }}",
"type": "string"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [
660,
0
],
"id": "766dba66-a4da-4d84-ad80-ca5579ce91e5",
"name": "Edit Fields2"
}
],
"connections": {
"Edit Fields": {
"main": [
[
{
"node": "Edit Fields1",
"type": "main",
"index": 0
}
]
]
},
"Edit Fields1": {
"main": [
[
{
"node": "Edit Fields2",
"type": "main",
"index": 0
}
]
]
},
"When chat message received": {
"main": [
[
{
"node": "Edit Fields",
"type": "main",
"index": 0
}
]
]
}
},
"pinData": {},
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "27cc9b56542ad45b38725555722c50a1c3fee1670bbb67980558314ee08517c4"
}
}

View File

@@ -254,12 +254,10 @@ describe('ManualExecutionService', () => {
await manualExecutionService.runManually(data, workflow, additionalData, executionId);
expect(mockRun).toHaveBeenCalledWith(
workflow,
undefined, // startNode
undefined, // destinationNode
undefined, // pinData
);
expect(mockRun.mock.calls[0][0]).toBe(workflow);
expect(mockRun.mock.calls[0][1]).toBeUndefined(); // startNode
expect(mockRun.mock.calls[0][2]).toBeUndefined(); // destinationNode
expect(mockRun.mock.calls[0][3]).toBeUndefined(); // pinData
});
it('should use execution start node when available for full execution', async () => {
@@ -297,11 +295,49 @@ describe('ManualExecutionService', () => {
expect(manualExecutionService.getExecutionStartNode).toHaveBeenCalledWith(data, workflow);
expect(mockRun.mock.calls[0][0]).toBe(workflow);
expect(mockRun.mock.calls[0][1]).toBe(startNode); // startNode
expect(mockRun.mock.calls[0][2]).toBeUndefined(); // destinationNode
expect(mockRun.mock.calls[0][3]).toBe(data.pinData); // pinData
});
it('should pass the triggerToStartFrom to workflowExecute.run for full execution', async () => {
const mockTriggerData = mock<ITaskData>();
const triggerNodeName = 'triggerNode';
const data = mock<IWorkflowExecutionDataProcess>({
executionMode: 'manual',
destinationNode: undefined,
pinData: undefined,
triggerToStartFrom: {
name: triggerNodeName,
data: mockTriggerData,
},
});
const startNode = mock<INode>({ name: 'startNode' });
const workflow = mock<Workflow>({
getNode: jest.fn().mockReturnValue(startNode),
});
const additionalData = mock<IWorkflowExecuteAdditionalData>();
const executionId = 'test-execution-id';
jest.spyOn(manualExecutionService, 'getExecutionStartNode').mockReturnValue(startNode);
const mockRun = jest.fn().mockReturnValue('mockRunReturn');
require('n8n-core').WorkflowExecute.mockImplementationOnce(() => ({
run: mockRun,
processRunExecutionData: jest.fn(),
}));
await manualExecutionService.runManually(data, workflow, additionalData, executionId);
expect(mockRun).toHaveBeenCalledWith(
workflow,
startNode, // startNode
undefined, // destinationNode
data.pinData, // pinData
undefined, // pinData
data.triggerToStartFrom, // triggerToStartFrom
);
});
@@ -455,5 +491,110 @@ describe('ManualExecutionService', () => {
}),
);
});
it('should call runPartialWorkflow2 for V2 partial execution with runData and empty startNodes', async () => {
const mockRunData = { nodeA: [{ data: { main: [[{ json: { value: 'test' } }]] } }] };
const destinationNodeName = 'nodeB';
const data = mock<IWorkflowExecutionDataProcess>({
executionMode: 'manual',
runData: mockRunData,
startNodes: [],
partialExecutionVersion: 2,
destinationNode: destinationNodeName,
pinData: {},
dirtyNodeNames: [],
agentRequest: undefined,
});
const workflow = mock<Workflow>({
getNode: jest.fn((name) => mock<INode>({ name })),
});
const additionalData = mock<IWorkflowExecuteAdditionalData>();
const executionId = 'test-exec-id-v2-empty-start';
const mockRunPartialWorkflow2 = jest.fn().mockReturnValue('mockPartial2Return-v2-empty');
(core.WorkflowExecute as jest.Mock).mockImplementationOnce(() => ({
runPartialWorkflow2: mockRunPartialWorkflow2,
processRunExecutionData: jest.fn(),
run: jest.fn(),
runPartialWorkflow: jest.fn(),
}));
await manualExecutionService.runManually(
data,
workflow,
additionalData,
executionId,
data.pinData,
);
expect(mockRunPartialWorkflow2).toHaveBeenCalledWith(
workflow,
mockRunData,
data.pinData,
data.dirtyNodeNames,
destinationNodeName,
data.agentRequest,
);
});
it('should call workflowExecute.run for V1 partial execution with runData and empty startNodes', async () => {
const mockRunData = { nodeA: [{ data: { main: [[{ json: { value: 'test' } }]] } }] };
const data = mock<IWorkflowExecutionDataProcess>({
executionMode: 'manual',
runData: mockRunData,
startNodes: [],
destinationNode: 'nodeC',
pinData: { nodeX: [{ json: {} }] },
triggerToStartFrom: undefined,
});
const determinedStartNode = mock<INode>({ name: 'manualTrigger' });
const destinationNodeMock = mock<INode>({ name: data.destinationNode });
const workflow = mock<Workflow>({
getNode: jest.fn((name) => {
if (name === data.destinationNode) {
return destinationNodeMock;
}
if (name === determinedStartNode.name) {
return determinedStartNode;
}
return null;
}),
getTriggerNodes: jest.fn().mockReturnValue([determinedStartNode]),
});
jest
.spyOn(manualExecutionService, 'getExecutionStartNode')
.mockReturnValue(determinedStartNode);
const additionalData = mock<IWorkflowExecuteAdditionalData>();
const executionId = 'test-exec-id-v1-empty-start';
const mockRun = jest.fn().mockReturnValue('mockRunReturn-v1-empty');
(core.WorkflowExecute as jest.Mock).mockImplementationOnce(() => ({
run: mockRun,
processRunExecutionData: jest.fn(),
runPartialWorkflow: jest.fn(),
runPartialWorkflow2: jest.fn(),
}));
await manualExecutionService.runManually(
data,
workflow,
additionalData,
executionId,
data.pinData,
);
expect(manualExecutionService.getExecutionStartNode).toHaveBeenCalledWith(data, workflow);
expect(mockRun).toHaveBeenCalledWith(
workflow,
determinedStartNode,
data.destinationNode,
data.pinData,
data.triggerToStartFrom,
);
});
});
});

View File

@@ -11,9 +11,12 @@ import {
} from 'n8n-core';
import { MANUAL_TRIGGER_NODE_TYPE } from 'n8n-workflow';
import type {
IExecuteData,
IPinData,
IRun,
IRunExecutionData,
IWaitingForExecution,
IWaitingForExecutionSource,
IWorkflowExecuteAdditionalData,
IWorkflowExecutionDataProcess,
Workflow,
@@ -59,7 +62,7 @@ export class ManualExecutionService {
executionId: string,
pinData?: IPinData,
): PCancelable<IRun> {
if (data.triggerToStartFrom?.data && data.startNodes) {
if (data.triggerToStartFrom?.data && data.startNodes?.length) {
this.logger.debug(
`Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`,
{ executionId },
@@ -71,13 +74,22 @@ export class ManualExecutionService {
});
const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] };
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(
let nodeExecutionStack: IExecuteData[] = [];
let waitingExecution: IWaitingForExecution = {};
let waitingExecutionSource: IWaitingForExecutionSource = {};
if (data.destinationNode !== data.triggerToStartFrom.name) {
const recreatedStack = recreateNodeExecutionStack(
filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)),
new Set(startNodes),
runData,
data.pinData ?? {},
);
nodeExecutionStack = recreatedStack.nodeExecutionStack;
waitingExecution = recreatedStack.waitingExecution;
waitingExecutionSource = recreatedStack.waitingExecutionSource;
}
const executionData: IRunExecutionData = {
resultData: { runData, pinData },
executionData: {
@@ -101,8 +113,7 @@ export class ManualExecutionService {
return workflowExecute.processRunExecutionData(workflow);
} else if (
data.runData === undefined ||
data.startNodes === undefined ||
data.startNodes.length === 0
(data.partialExecutionVersion !== 2 && (!data.startNodes || data.startNodes.length === 0))
) {
// Full Execution
// TODO: When the old partial execution logic is removed this block can
@@ -143,7 +154,13 @@ export class ManualExecutionService {
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData);
return workflowExecute.run(
workflow,
startNode,
data.destinationNode,
data.pinData,
data.triggerToStartFrom,
);
} else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
@@ -163,7 +180,7 @@ export class ManualExecutionService {
return workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.startNodes ?? [],
data.destinationNode,
data.pinData,
);

View File

@@ -41,6 +41,7 @@ import type {
INodeType,
ITaskStartedData,
AiAgentRequest,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
LoggerProxy as Logger,
@@ -118,6 +119,7 @@ export class WorkflowExecute {
startNode?: INode,
destinationNode?: string,
pinData?: IPinData,
triggerToStartFrom?: IWorkflowExecutionDataProcess['triggerToStartFrom'],
): PCancelable<IRun> {
this.status = 'running';
@@ -139,7 +141,7 @@ export class WorkflowExecute {
const nodeExecutionStack: IExecuteData[] = [
{
node: startNode,
data: {
data: triggerToStartFrom?.data?.data ?? {
main: [
[
{

View File

@@ -359,6 +359,64 @@ describe('useRunWorkflow({ router })', () => {
expect(result).toEqual(mockExecutionResponse);
});
it('should exclude destinationNode from startNodes when provided', async () => {
// ARRANGE
const mockExecutionResponse = { executionId: '123' };
const { runWorkflow } = useRunWorkflow({ router });
const dataCaptor = captor<IStartRunData>();
const parentNodeName = 'parentNode';
const destinationNodeName = 'destinationNode';
// Mock workflow with parent-child relationship
const workflow = {
name: 'Test Workflow',
id: 'workflowId',
getParentNodes: vi.fn().mockImplementation((nodeName: string) => {
if (nodeName === destinationNodeName) {
return [parentNodeName];
}
return [];
}),
nodes: {
[parentNodeName]: createTestNode({ name: parentNodeName }),
[destinationNodeName]: createTestNode({ name: destinationNodeName }),
},
} as unknown as Workflow;
vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);
vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({
id: 'workflowId',
nodes: [],
} as unknown as IWorkflowData);
vi.mocked(workflowsStore).getWorkflowRunData = {
[parentNodeName]: [
{
startTime: 1,
executionTime: 0,
source: [],
data: { main: [[{ json: { test: 'data' } }]] },
},
],
} as unknown as IRunData;
// ACT
await runWorkflow({ destinationNode: destinationNodeName });
// ASSERT
expect(workflowsStore.runWorkflow).toHaveBeenCalledTimes(1);
expect(workflowsStore.runWorkflow).toHaveBeenCalledWith(dataCaptor);
const startNodes = dataCaptor.value.startNodes ?? [];
const destinationInStartNodes = startNodes.some((node) => node.name === destinationNodeName);
expect(destinationInStartNodes).toBe(false);
});
it('should send dirty nodes for partial executions v2', async () => {
vi.mocked(settingsStore).partialExecutionVersion = 2;
const composable = useRunWorkflow({ router });

View File

@@ -163,6 +163,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
let triggerToStartFrom: IStartRunData['triggerToStartFrom'];
if (
startNodeNames.length === 0 &&
directParentNodes.length === 0 &&
'destinationNode' in options &&
options.destinationNode !== undefined
) {
@@ -174,6 +175,8 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
);
newRunData = { [options.triggerNode]: [options.nodeData] };
executedNode = options.triggerNode;
} else if (options.destinationNode) {
executedNode = options.destinationNode;
}
if (options.triggerNode) {
@@ -237,24 +240,35 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
const version = settingsStore.partialExecutionVersion;
// TODO: this will be redundant once we cleanup the partial execution v1
const startNodes: StartNodeData[] = sortNodesByYPosition(startNodeNames).map((name) => {
// Find for each start node the source data
let sourceData = get(runData, [name, 0, 'source', 0], null);
if (sourceData === null) {
const parentNodes = workflow.getParentNodes(name, NodeConnectionTypes.Main, 1);
const executeData = workflowHelpers.executeData(
parentNodes,
const startNodes: StartNodeData[] = sortNodesByYPosition(startNodeNames)
.map((name) => {
// Find for each start node the source data
let sourceData = get(runData, [name, 0, 'source', 0], null);
if (sourceData === null) {
const parentNodes = workflow.getParentNodes(name, NodeConnectionTypes.Main, 1);
const executeData = workflowHelpers.executeData(
parentNodes,
name,
NodeConnectionTypes.Main,
0,
);
sourceData = get(executeData, ['source', NodeConnectionTypes.Main, 0], null);
}
return {
name,
NodeConnectionTypes.Main,
0,
);
sourceData = get(executeData, ['source', NodeConnectionTypes.Main, 0], null);
}
return {
name,
sourceData,
};
});
sourceData,
};
})
// If a destination node is specified and it has chat parent, we don't want to include it in the start nodes
.filter((node) => {
if (
options.destinationNode &&
workflowsStore.checkIfNodeHasChatParent(options.destinationNode)
) {
return node.name !== options.destinationNode;
}
return true;
});
const singleWebhookTrigger = triggers.find((node) =>
SINGLE_WEBHOOK_TRIGGERS.includes(node.type),