fix(editor): Waiting executions broken - Chat, Form, Wait (no-changelog) (#15343)

This commit is contained in:
Suguru Inoue
2025-05-13 17:01:00 +02:00
committed by GitHub
parent 3176f6fc89
commit 694af6c9f0
10 changed files with 279 additions and 58 deletions

View File

@@ -120,6 +120,18 @@ export function getNodeByName(name: string) {
); );
} }
export function getNodesWithSpinner() {
return cy
.getByTestId('canvas-node')
.filter((_, el) => Cypress.$(el).find('[data-icon=sync-alt]').length > 0);
}
export function getWaitingNodes() {
return cy
.getByTestId('canvas-node')
.filter((_, el) => Cypress.$(el).find('[data-icon=clock]').length > 0);
}
export function getNodeRenderedTypeByName(name: string) { export function getNodeRenderedTypeByName(name: string) {
return cy.ifCanvasVersion( return cy.ifCanvasVersion(
() => getNodeByName(name), () => getNodeByName(name),

View File

@@ -6,6 +6,7 @@ import * as workflow from '../composables/workflow';
import Workflow_chat from '../fixtures/Workflow_ai_agent.json'; import Workflow_chat from '../fixtures/Workflow_ai_agent.json';
import Workflow_if from '../fixtures/Workflow_if.json'; import Workflow_if from '../fixtures/Workflow_if.json';
import Workflow_loop from '../fixtures/Workflow_loop.json'; import Workflow_loop from '../fixtures/Workflow_loop.json';
import Workflow_wait_for_webhook from '../fixtures/Workflow_wait_for_webhook.json';
describe('Logs', () => { describe('Logs', () => {
beforeEach(() => { beforeEach(() => {
@@ -193,4 +194,42 @@ describe('Logs', () => {
executions.getLogEntries().eq(1).should('contain.text', 'AI Agent'); executions.getLogEntries().eq(1).should('contain.text', 'AI Agent');
executions.getLogEntries().eq(2).should('contain.text', 'E2E Chat Model'); executions.getLogEntries().eq(2).should('contain.text', 'E2E Chat Model');
}); });
it('should show logs for a workflow with a node that waits for webhook', () => {
workflow.navigateToNewWorkflowPage();
workflow.pasteWorkflow(Workflow_wait_for_webhook);
workflow.clickZoomToFit();
logs.openLogsPanel();
workflow.executeWorkflow();
workflow.getNodesWithSpinner().should('contain.text', 'Wait');
workflow.getWaitingNodes().should('contain.text', 'Wait');
logs.getLogEntries().should('have.length', 2);
logs.getLogEntries().eq(0).click(); // click selected row to deselect
logs.getLogEntries().eq(1).should('contain.text', 'Wait node');
logs.getLogEntries().eq(1).should('contain.text', 'Waiting');
workflow.openNode('Wait node');
ndv
.getOutputPanelDataContainer()
.find('a')
.should('have.attr', 'href')
.then((url) => {
cy.request(url as unknown as string).then((response) => {
expect(response.status).to.eq(200);
});
});
ndv.getBackToCanvasButton().click();
workflow.getNodesWithSpinner().should('not.exist');
workflow.getWaitingNodes().should('not.exist');
logs
.getOverviewStatus()
.contains(/Success in [\d\.]+m?s/)
.should('exist');
logs.getLogEntries().should('have.length', 2);
logs.getLogEntries().eq(1).should('contain.text', 'Wait node');
logs.getLogEntries().eq(1).should('contain.text', 'Success');
});
}); });

View File

@@ -0,0 +1,41 @@
{
"nodes": [
{
"parameters": {},
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [0, 0],
"id": "42c6e003-10e7-4100-aff8-8865c49f384c",
"name": "When clicking Test workflow"
},
{
"parameters": {
"resume": "webhook",
"options": {}
},
"type": "n8n-nodes-base.wait",
"typeVersion": 1.1,
"position": [220, 0],
"id": "77614c15-c41e-4b8c-95d6-084d48fed328",
"name": "Wait node",
"webhookId": "62aad98c-81b3-4c44-9adb-b33a23d1271d"
}
],
"connections": {
"When clicking Test workflow": {
"main": [
[
{
"node": "Wait node",
"type": "main",
"index": 0
}
]
]
}
},
"pinData": {},
"meta": {
"instanceId": "eea4a7b09aa7ee308bc067003a65466862f88be8d9309a2bb16297f6bb2616ec"
}
}

View File

@@ -224,7 +224,7 @@ describe('LogsPanel', () => {
expect(rendered.getByText('Running')).toBeInTheDocument(); expect(rendered.getByText('Running')).toBeInTheDocument();
expect(rendered.queryByText('AI Agent')).not.toBeInTheDocument(); expect(rendered.queryByText('AI Agent')).not.toBeInTheDocument();
workflowsStore.addNodeExecutionData({ workflowsStore.addNodeExecutionStartedData({
nodeName: 'AI Agent', nodeName: 'AI Agent',
executionId: '567', executionId: '567',
data: { executionIndex: 0, startTime: Date.parse('2025-04-20T12:34:51.000Z'), source: [] }, data: { executionIndex: 0, startTime: Date.parse('2025-04-20T12:34:51.000Z'), source: [] },

View File

@@ -8,6 +8,7 @@ import { useThrottleFn } from '@vueuse/core';
import { import {
createLogTree, createLogTree,
deepToRaw, deepToRaw,
mergeStartData,
type LatestNodeInfo, type LatestNodeInfo,
type LogEntry, type LogEntry,
} from '@/components/RunDataAi/utils'; } from '@/components/RunDataAi/utils';
@@ -106,11 +107,19 @@ export function useExecutionData() {
() => workflowsStore.workflowExecutionData?.workflowData.id, () => workflowsStore.workflowExecutionData?.workflowData.id,
() => workflowsStore.workflowExecutionData?.status, () => workflowsStore.workflowExecutionData?.status,
() => workflowsStore.workflowExecutionResultDataLastUpdate, () => workflowsStore.workflowExecutionResultDataLastUpdate,
() => workflowsStore.workflowExecutionStartedData,
], ],
useThrottleFn( useThrottleFn(
([executionId], [previousExecutionId]) => { ([executionId], [previousExecutionId]) => {
// Create deep copy to disable reactivity execData.value =
execData.value = deepToRaw(workflowsStore.workflowExecutionData ?? undefined); workflowsStore.workflowExecutionData === null
? undefined
: deepToRaw(
mergeStartData(
workflowsStore.workflowExecutionStartedData?.[1] ?? {},
workflowsStore.workflowExecutionData,
),
); // Create deep copy to disable reactivity
if (executionId !== previousExecutionId) { if (executionId !== previousExecutionId) {
// Reset sub workflow data when top-level execution changes // Reset sub workflow data when top-level execution changes

View File

@@ -14,11 +14,13 @@ import {
getDefaultCollapsedEntries, getDefaultCollapsedEntries,
getTreeNodeData, getTreeNodeData,
getTreeNodeDataV2, getTreeNodeDataV2,
mergeStartData,
} from '@/components/RunDataAi/utils'; } from '@/components/RunDataAi/utils';
import { import {
AGENT_LANGCHAIN_NODE_TYPE, AGENT_LANGCHAIN_NODE_TYPE,
type ExecutionError, type ExecutionError,
type ITaskData, type ITaskData,
type ITaskStartedData,
NodeConnectionTypes, NodeConnectionTypes,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { type LogEntrySelection } from '../CanvasChat/types/logs'; import { type LogEntrySelection } from '../CanvasChat/types/logs';
@@ -1259,7 +1261,7 @@ describe(createLogTree, () => {
executionIndex: 3, executionIndex: 3,
}), }),
createTestTaskData({ createTestTaskData({
startTime: Date.parse('2025-04-04T00:00:03.000Z'), startTime: Date.parse('2025-04-04T00:00:02.000Z'),
executionIndex: 2, executionIndex: 2,
}), }),
], ],
@@ -1318,7 +1320,7 @@ describe(createLogTree, () => {
executionIndex: 3, executionIndex: 3,
}), }),
createTestTaskData({ createTestTaskData({
startTime: Date.parse('2025-04-04T00:00:03.000Z'), startTime: Date.parse('2025-04-04T00:00:02.000Z'),
executionIndex: 2, executionIndex: 2,
}), }),
], ],
@@ -1457,6 +1459,90 @@ describe(deepToRaw, () => {
}); });
}); });
describe(mergeStartData, () => {
it('should return unchanged execution response if start data is empty', () => {
const response = createTestWorkflowExecutionResponse({
data: {
resultData: {
runData: {
A: [createTestTaskData()],
B: [createTestTaskData(), createTestTaskData()],
},
},
},
});
expect(mergeStartData({}, response)).toEqual(response);
});
it('should add runs in start data to the execution response as running state', () => {
const response = createTestWorkflowExecutionResponse({
data: {
resultData: {
runData: {
A: [createTestTaskData({ startTime: 0, executionIndex: 0 })],
B: [
createTestTaskData({ startTime: 1, executionIndex: 1 }),
createTestTaskData({ startTime: 2, executionIndex: 2 }),
],
},
},
},
});
const startData: { [nodeName: string]: ITaskStartedData[] } = {
B: [{ startTime: 3, executionIndex: 3, source: [] }],
C: [{ startTime: 4, executionIndex: 4, source: [] }],
};
const merged = mergeStartData(startData, response);
expect(merged.data?.resultData.runData.A).toEqual(response.data?.resultData.runData.A);
expect(merged.data?.resultData.runData.B).toEqual([
response.data!.resultData.runData.B[0],
response.data!.resultData.runData.B[1],
{ ...startData.B[0], executionStatus: 'running', executionTime: 0 },
]);
expect(merged.data?.resultData.runData.C).toEqual([
{ ...startData.C[0], executionStatus: 'running', executionTime: 0 },
]);
});
it('should not add runs in start data if a run with the same executionIndex already exists in response', () => {
const response = createTestWorkflowExecutionResponse({
data: {
resultData: {
runData: {
A: [createTestTaskData({ executionIndex: 0 })],
},
},
},
});
const startData = {
A: [createTestTaskData({ executionIndex: 0 })],
};
const merged = mergeStartData(startData, response);
expect(merged.data?.resultData.runData.A).toEqual(response.data?.resultData.runData.A);
});
it('should not add runs in start data if a run for the same node with larger start time already exists in response', () => {
const response = createTestWorkflowExecutionResponse({
data: {
resultData: {
runData: {
A: [createTestTaskData({ startTime: 1, executionIndex: 1 })],
},
},
},
});
const startData = {
A: [createTestTaskData({ startTime: 0, executionIndex: 0 })],
};
const merged = mergeStartData(startData, response);
expect(merged.data?.resultData.runData.A).toEqual(response.data?.resultData.runData.A);
});
});
describe(getDefaultCollapsedEntries, () => { describe(getDefaultCollapsedEntries, () => {
it('should recursively find logs for runs with a sub execution and has no child logs', () => { it('should recursively find logs for runs with a sub execution and has no child logs', () => {
const entries = [ const entries = [

View File

@@ -11,6 +11,7 @@ import {
type ITaskDataConnections, type ITaskDataConnections,
type NodeConnectionType, type NodeConnectionType,
type Workflow, type Workflow,
type ITaskStartedData,
type IRunExecutionData, type IRunExecutionData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { type LogEntrySelection } from '../CanvasChat/types/logs'; import { type LogEntrySelection } from '../CanvasChat/types/logs';
@@ -397,17 +398,7 @@ function getTreeNodeDataRecV2(
runIndex: number | undefined, runIndex: number | undefined,
): LogEntry[] { ): LogEntry[] {
const treeNode = createNodeV2(node, context, runIndex ?? 0, runData); const treeNode = createNodeV2(node, context, runIndex ?? 0, runData);
const children = getChildNodes(treeNode, node, runIndex, context).sort((a, b) => { const children = getChildNodes(treeNode, node, runIndex, context).sort(sortLogEntries);
// Sort the data by execution index or start time
if (a.runData.executionIndex !== undefined && b.runData.executionIndex !== undefined) {
return a.runData.executionIndex - b.runData.executionIndex;
}
const aTime = a.runData.startTime ?? 0;
const bTime = b.runData.startTime ?? 0;
return aTime - bTime;
});
treeNode.children = children; treeNode.children = children;
@@ -483,23 +474,15 @@ function createLogTreeRec(context: LogTreeCreationContext) {
? [] // skip sub nodes and disabled nodes ? [] // skip sub nodes and disabled nodes
: taskData.map((task, runIndex) => ({ : taskData.map((task, runIndex) => ({
nodeName, nodeName,
task, runData: task,
runIndex, runIndex,
nodeHasMultipleRuns: taskData.length > 1, nodeHasMultipleRuns: taskData.length > 1,
})), })),
) )
.sort((a, b) => { .sort(sortLogEntries);
if (a.task.executionIndex !== undefined && b.task.executionIndex !== undefined) {
return a.task.executionIndex - b.task.executionIndex;
}
return a.nodeName === b.nodeName return runs.flatMap(({ nodeName, runIndex, runData, nodeHasMultipleRuns }) =>
? a.runIndex - b.runIndex getTreeNodeDataV2(nodeName, runData, nodeHasMultipleRuns ? runIndex : undefined, context),
: a.task.startTime - b.task.startTime;
});
return runs.flatMap(({ nodeName, runIndex, task, nodeHasMultipleRuns }) =>
getTreeNodeDataV2(nodeName, task, nodeHasMultipleRuns ? runIndex : undefined, context),
); );
} }
@@ -595,6 +578,65 @@ export function flattenLogEntries(
return ret; return ret;
} }
function sortLogEntries<T extends { runData: ITaskData }>(a: T, b: T) {
// We rely on execution index only when startTime is different
// Because it is reset to 0 when execution is waited, and therefore not necessarily unique
if (a.runData.startTime === b.runData.startTime) {
return a.runData.executionIndex - b.runData.executionIndex;
}
return a.runData.startTime - b.runData.startTime;
}
export function mergeStartData(
startData: { [nodeName: string]: ITaskStartedData[] },
response: IExecutionResponse,
): IExecutionResponse {
if (!response.data) {
return response;
}
const nodeNames = [
...new Set(
Object.keys(startData).concat(Object.keys(response.data.resultData.runData)),
).values(),
];
const runData = Object.fromEntries(
nodeNames.map<[string, ITaskData[]]>((nodeName) => {
const tasks = response.data?.resultData.runData[nodeName] ?? [];
const mergedTasks = tasks.concat(
(startData[nodeName] ?? [])
.filter((task) =>
// To remove duplicate runs, we check start time in addition to execution index
// because nodes such as Wait and Form emits multiple websocket events with
// different execution index for a single run
tasks.every(
(t) => t.startTime < task.startTime && t.executionIndex !== task.executionIndex,
),
)
.map<ITaskData>((task) => ({
...task,
executionTime: 0,
executionStatus: 'running',
})),
);
return [nodeName, mergedTasks];
}),
);
return {
...response,
data: {
...response.data,
resultData: {
...response.data.resultData,
runData,
},
},
};
}
export function hasSubExecution(entry: LogEntry): boolean { export function hasSubExecution(entry: LogEntry): boolean {
return !!entry.runData.metadata?.subExecution; return !!entry.runData.metadata?.subExecution;
} }

View File

@@ -21,7 +21,6 @@ import type {
ExpressionError, ExpressionError,
IDataObject, IDataObject,
IRunExecutionData, IRunExecutionData,
IRunData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus'; import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus';
import { h } from 'vue'; import { h } from 'vue';
@@ -467,8 +466,6 @@ export function setRunExecutionData(
runExecutionData.resultData.runData = workflowsStore.getWorkflowRunData; runExecutionData.resultData.runData = workflowsStore.getWorkflowRunData;
} }
removeRunningTaskData(runExecutionData.resultData.runData);
workflowsStore.executingNode.length = 0; workflowsStore.executingNode.length = 0;
workflowsStore.setWorkflowExecutionData({ workflowsStore.setWorkflowExecutionData({
@@ -508,11 +505,3 @@ export function setRunExecutionData(
const lineNumber = runExecutionData.resultData?.error?.lineNumber; const lineNumber = runExecutionData.resultData?.error?.lineNumber;
codeNodeEditorEventBus.emit('highlightLine', lineNumber ?? 'last'); codeNodeEditorEventBus.emit('highlightLine', lineNumber ?? 'last');
} }
function removeRunningTaskData(runData: IRunData): void {
for (const [nodeName, taskItems] of Object.entries(runData)) {
if (taskItems.some((item) => item.executionStatus === 'running')) {
runData[nodeName] = taskItems.filter((item) => item.executionStatus !== 'running');
}
}
}

View File

@@ -8,5 +8,5 @@ export async function nodeExecuteBefore({ data }: NodeExecuteBefore) {
const workflowsStore = useWorkflowsStore(); const workflowsStore = useWorkflowsStore();
workflowsStore.addExecutingNode(data.nodeName); workflowsStore.addExecutingNode(data.nodeName);
workflowsStore.addNodeExecutionData(data); workflowsStore.addNodeExecutionStartedData(data);
} }

View File

@@ -55,6 +55,7 @@ import type {
ITaskData, ITaskData,
IWorkflowSettings, IWorkflowSettings,
INodeType, INodeType,
ITaskStartedData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
deepCopy, deepCopy,
@@ -141,6 +142,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const activeWorkflowExecution = ref<ExecutionSummary | null>(null); const activeWorkflowExecution = ref<ExecutionSummary | null>(null);
const currentWorkflowExecutions = ref<ExecutionSummary[]>([]); const currentWorkflowExecutions = ref<ExecutionSummary[]>([]);
const workflowExecutionData = ref<IExecutionResponse | null>(null); const workflowExecutionData = ref<IExecutionResponse | null>(null);
const workflowExecutionStartedData =
ref<[executionId: string, data: { [nodeName: string]: ITaskStartedData[] }]>();
const workflowExecutionResultDataLastUpdate = ref<number>(); const workflowExecutionResultDataLastUpdate = ref<number>();
const workflowExecutionPairedItemMappings = ref<Record<string, Set<string>>>({}); const workflowExecutionPairedItemMappings = ref<Record<string, Set<string>>>({});
const subWorkflowExecutionError = ref<Error | null>(null); const subWorkflowExecutionError = ref<Error | null>(null);
@@ -868,6 +871,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
workflowExecutionData.value = workflowResultData; workflowExecutionData.value = workflowResultData;
workflowExecutionPairedItemMappings.value = getPairedItemsMapping(workflowResultData); workflowExecutionPairedItemMappings.value = getPairedItemsMapping(workflowResultData);
workflowExecutionResultDataLastUpdate.value = Date.now(); workflowExecutionResultDataLastUpdate.value = Date.now();
workflowExecutionStartedData.value = undefined;
} }
function setWorkflowExecutionRunData(workflowResultData: IRunExecutionData) { function setWorkflowExecutionRunData(workflowResultData: IRunExecutionData) {
@@ -877,6 +881,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
data: workflowResultData, data: workflowResultData,
}; };
workflowExecutionResultDataLastUpdate.value = Date.now(); workflowExecutionResultDataLastUpdate.value = Date.now();
workflowExecutionStartedData.value = undefined;
} }
} }
@@ -1484,24 +1489,19 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
return testUrl; return testUrl;
} }
function addNodeExecutionData(data: NodeExecuteBefore['data']): void { function addNodeExecutionStartedData(data: NodeExecuteBefore['data']): void {
if (settingsStore.isNewLogsEnabled) { const currentData =
const node = getNodeByName(data.nodeName); workflowExecutionStartedData.value?.[0] === data.executionId
if (!node || !workflowExecutionData.value?.data) { ? workflowExecutionStartedData.value?.[1]
return; : {};
}
if (workflowExecutionData.value.data.resultData.runData[data.nodeName] === undefined) { workflowExecutionStartedData.value = [
workflowExecutionData.value.data.resultData.runData[data.nodeName] = []; data.executionId,
} {
...currentData,
workflowExecutionData.value.data.resultData.runData[data.nodeName].push({ [data.nodeName]: [...(currentData[data.nodeName] ?? []), data.data],
executionStatus: 'running', },
executionTime: 0, ];
...data.data,
});
workflowExecutionResultDataLastUpdate.value = Date.now();
}
} }
function updateNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void { function updateNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
@@ -1533,6 +1533,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const tasksData = workflowExecutionData.value.data!.resultData.runData[nodeName]; const tasksData = workflowExecutionData.value.data!.resultData.runData[nodeName];
if (isNodeWaiting) { if (isNodeWaiting) {
tasksData.push(data); tasksData.push(data);
workflowExecutionResultDataLastUpdate.value = Date.now();
if ( if (
node.type === FORM_NODE_TYPE || node.type === FORM_NODE_TYPE ||
(node.type === WAIT_NODE_TYPE && node.parameters.resume === 'form') (node.type === WAIT_NODE_TYPE && node.parameters.resume === 'form')
@@ -1843,6 +1845,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
workflowExecutionData, workflowExecutionData,
workflowExecutionPairedItemMappings, workflowExecutionPairedItemMappings,
workflowExecutionResultDataLastUpdate, workflowExecutionResultDataLastUpdate,
workflowExecutionStartedData,
activeExecutionId: readonlyActiveExecutionId, activeExecutionId: readonlyActiveExecutionId,
previousExecutionId: readonlyPreviousExecutionId, previousExecutionId: readonlyPreviousExecutionId,
setActiveExecutionId, setActiveExecutionId,
@@ -1911,7 +1914,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
makeNewWorkflowShareable, makeNewWorkflowShareable,
resetWorkflow, resetWorkflow,
resetState, resetState,
addNodeExecutionData, addNodeExecutionStartedData,
addExecutingNode, addExecutingNode,
removeExecutingNode, removeExecutingNode,
setWorkflowId, setWorkflowId,